| /* |
| * librdkafka - Apache Kafka C library |
| * |
| * Copyright (c) 2012-2015, Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include "rdkafka_int.h" |
| #include "rdkafka_broker.h" |
| #include "rdkafka_request.h" |
| #include "rdkafka_topic.h" |
| #include "rdkafka_partition.h" |
| #include "rdkafka_assignor.h" |
| #include "rdkafka_offset.h" |
| #include "rdkafka_metadata.h" |
| #include "rdkafka_cgrp.h" |
| #include "rdkafka_interceptor.h" |
| |
| |
| static void rd_kafka_cgrp_check_unassign_done (rd_kafka_cgrp_t *rkcg, |
| const char *reason); |
| static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts, |
| void *arg); |
| static void rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_topic_partition_list_t *assignment); |
| static rd_kafka_resp_err_t rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg); |
| static void |
| rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_topic_partition_list_t |
| *assignment, int usable_offsets, |
| int line); |
| #define rd_kafka_cgrp_partitions_fetch_start(rkcg,assignment,usable_offsets) \ |
| rd_kafka_cgrp_partitions_fetch_start0(rkcg,assignment,usable_offsets,\ |
| __LINE__) |
| static rd_kafka_op_res_t |
| rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq, |
| rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type, |
| void *opaque); |
| |
| static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg, |
| const char *reason); |
| |
| /** |
| * @returns true if cgrp can start partition fetchers, which is true if |
| * there is a subscription and the group is fully joined, or there |
| * is no subscription (in which case the join state is irrelevant) |
| * such as for an assign() without subscribe(). */ |
| #define RD_KAFKA_CGRP_CAN_FETCH_START(rkcg) \ |
| ((rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED) |
| |
| /** |
| * @returns true if cgrp is waiting for a rebalance_cb to be handled by |
| * the application. |
| */ |
| #define RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) \ |
| ((rkcg)->rkcg_join_state == \ |
| RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB || \ |
| (rkcg)->rkcg_join_state == \ |
| RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB) |
| |
| |
| const char *rd_kafka_cgrp_state_names[] = { |
| "init", |
| "term", |
| "query-coord", |
| "wait-coord", |
| "wait-broker", |
| "wait-broker-transport", |
| "up" |
| }; |
| |
| const char *rd_kafka_cgrp_join_state_names[] = { |
| "init", |
| "wait-join", |
| "wait-metadata", |
| "wait-sync", |
| "wait-unassign", |
| "wait-assign-rebalance_cb", |
| "wait-revoke-rebalance_cb", |
| "assigned", |
| "started" |
| }; |
| |
| |
| static void rd_kafka_cgrp_set_state (rd_kafka_cgrp_t *rkcg, int state) { |
| if ((int)rkcg->rkcg_state == state) |
| return; |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPSTATE", |
| "Group \"%.*s\" changed state %s -> %s " |
| "(v%d, join-state %s)", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_kafka_cgrp_state_names[rkcg->rkcg_state], |
| rd_kafka_cgrp_state_names[state], |
| rkcg->rkcg_version, |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); |
| rkcg->rkcg_state = state; |
| rkcg->rkcg_ts_statechange = rd_clock(); |
| |
| rd_kafka_brokers_broadcast_state_change(rkcg->rkcg_rk); |
| } |
| |
| |
| void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state) { |
| if ((int)rkcg->rkcg_join_state == join_state) |
| return; |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE", |
| "Group \"%.*s\" changed join state %s -> %s " |
| "(v%d, state %s)", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], |
| rd_kafka_cgrp_join_state_names[join_state], |
| rkcg->rkcg_version, |
| rd_kafka_cgrp_state_names[rkcg->rkcg_state]); |
| rkcg->rkcg_join_state = join_state; |
| } |
| |
| |
| static RD_INLINE void |
| rd_kafka_cgrp_version_new_barrier0 (rd_kafka_cgrp_t *rkcg, |
| const char *func, int line) { |
| rkcg->rkcg_version++; |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BARRIER", |
| "Group \"%.*s\": %s:%d: new version barrier v%d", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), func, line, |
| rkcg->rkcg_version); |
| } |
| |
| #define rd_kafka_cgrp_version_new_barrier(rkcg) \ |
| rd_kafka_cgrp_version_new_barrier0(rkcg, __FUNCTION__, __LINE__) |
| |
| |
| void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg) { |
| rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_assignment); |
| rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription); |
| rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_group_leader.members); |
| rd_kafka_cgrp_set_member_id(rkcg, NULL); |
| |
| rd_kafka_q_destroy(rkcg->rkcg_q); |
| rd_kafka_q_destroy(rkcg->rkcg_ops); |
| rd_kafka_q_destroy(rkcg->rkcg_wait_coord_q); |
| rd_kafka_assert(rkcg->rkcg_rk, TAILQ_EMPTY(&rkcg->rkcg_topics)); |
| rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars)); |
| rd_list_destroy(&rkcg->rkcg_toppars); |
| rd_list_destroy(rkcg->rkcg_subscribed_topics); |
| rd_free(rkcg); |
| } |
| |
| |
| |
| |
| rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk, |
| const rd_kafkap_str_t *group_id, |
| const rd_kafkap_str_t *client_id) { |
| rd_kafka_cgrp_t *rkcg; |
| |
| rkcg = rd_calloc(1, sizeof(*rkcg)); |
| |
| rkcg->rkcg_rk = rk; |
| rkcg->rkcg_group_id = group_id; |
| rkcg->rkcg_client_id = client_id; |
| rkcg->rkcg_coord_id = -1; |
| rkcg->rkcg_generation_id = -1; |
| rkcg->rkcg_version = 1; |
| |
| mtx_init(&rkcg->rkcg_lock, mtx_plain); |
| rkcg->rkcg_ops = rd_kafka_q_new(rk); |
| rkcg->rkcg_ops->rkq_serve = rd_kafka_cgrp_op_serve; |
| rkcg->rkcg_ops->rkq_opaque = rkcg; |
| rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk); |
| rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve; |
| rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque; |
| rkcg->rkcg_q = rd_kafka_q_new(rk); |
| |
| TAILQ_INIT(&rkcg->rkcg_topics); |
| rd_list_init(&rkcg->rkcg_toppars, 32, NULL); |
| rd_kafka_cgrp_set_member_id(rkcg, ""); |
| rkcg->rkcg_subscribed_topics = |
| rd_list_new(0, (void *)rd_kafka_topic_info_destroy); |
| rd_interval_init(&rkcg->rkcg_coord_query_intvl); |
| rd_interval_init(&rkcg->rkcg_heartbeat_intvl); |
| rd_interval_init(&rkcg->rkcg_join_intvl); |
| rd_interval_init(&rkcg->rkcg_timeout_scan_intvl); |
| |
| if (RD_KAFKAP_STR_IS_NULL(group_id)) { |
| /* No group configured: Operate in legacy/SimpleConsumer mode */ |
| rd_kafka_simple_consumer_add(rk); |
| /* no need look up group coordinator (no queries) */ |
| rd_interval_disable(&rkcg->rkcg_coord_query_intvl); |
| } |
| |
| if (rk->rk_conf.enable_auto_commit && |
| rk->rk_conf.auto_commit_interval_ms > 0) |
| rd_kafka_timer_start(&rk->rk_timers, |
| &rkcg->rkcg_offset_commit_tmr, |
| rk->rk_conf. |
| auto_commit_interval_ms * 1000ll, |
| rd_kafka_cgrp_offset_commit_tmr_cb, |
| rkcg); |
| |
| return rkcg; |
| } |
| |
| |
| |
| /** |
| * Select a broker to handle this cgrp. |
| * It will prefer the coordinator broker but if that is not available |
| * any other broker that is Up will be used, and if that also fails |
| * uses the internal broker handle. |
| * |
| * NOTE: The returned rkb will have had its refcnt increased. |
| */ |
| static rd_kafka_broker_t *rd_kafka_cgrp_select_broker (rd_kafka_cgrp_t *rkcg) { |
| rd_kafka_broker_t *rkb = NULL; |
| |
| |
| /* No need for a managing broker when cgrp is terminated */ |
| if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM) |
| return NULL; |
| |
| rd_kafka_rdlock(rkcg->rkcg_rk); |
| /* Try to find the coordinator broker, if it isn't found |
| * move the cgrp to any other Up broker which will |
| * do further coord querying while waiting for the |
| * proper broker to materialise. |
| * If that also fails, go with the internal broker */ |
| if (rkcg->rkcg_coord_id != -1) |
| rkb = rd_kafka_broker_find_by_nodeid(rkcg->rkcg_rk, |
| rkcg->rkcg_coord_id); |
| if (!rkb) |
| rkb = rd_kafka_broker_prefer(rkcg->rkcg_rk, |
| rkcg->rkcg_coord_id, |
| RD_KAFKA_BROKER_STATE_UP); |
| if (!rkb) |
| rkb = rd_kafka_broker_internal(rkcg->rkcg_rk); |
| |
| rd_kafka_rdunlock(rkcg->rkcg_rk); |
| |
| /* Dont change managing broker unless warranted. |
| * This means do not change to another non-coordinator broker |
| * while we are waiting for the proper coordinator broker to |
| * become available. */ |
| if (rkb && rkcg->rkcg_rkb && rkb != rkcg->rkcg_rkb) { |
| int old_is_coord, new_is_coord; |
| |
| rd_kafka_broker_lock(rkb); |
| new_is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb); |
| rd_kafka_broker_unlock(rkb); |
| |
| rd_kafka_broker_lock(rkcg->rkcg_rkb); |
| old_is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, |
| rkcg->rkcg_rkb); |
| rd_kafka_broker_unlock(rkcg->rkcg_rkb); |
| |
| if (!old_is_coord && !new_is_coord && |
| rkcg->rkcg_rkb->rkb_source != RD_KAFKA_INTERNAL) { |
| rd_kafka_broker_destroy(rkb); |
| rkb = rkcg->rkcg_rkb; |
| rd_kafka_broker_keep(rkb); |
| } |
| } |
| |
| return rkb; |
| } |
| |
| |
| |
| |
| /** |
| * Assign cgrp to broker. |
| * |
| * Locality: rdkafka main thread |
| */ |
| static void rd_kafka_cgrp_assign_broker (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_broker_t *rkb) { |
| |
| rd_kafka_assert(NULL, rkcg->rkcg_rkb == NULL); |
| |
| rkcg->rkcg_rkb = rkb; |
| rd_kafka_broker_keep(rkb); |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKASSIGN", |
| "Group \"%.*s\" management assigned to broker %s", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_kafka_broker_name(rkb)); |
| |
| /* Reset query interval to trigger an immediate |
| * coord query if required */ |
| if (!rd_interval_disabled(&rkcg->rkcg_coord_query_intvl)) |
| rd_interval_reset(&rkcg->rkcg_coord_query_intvl); |
| |
| if (RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb)) |
| rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT); |
| |
| } |
| |
| |
| /** |
| * Unassign cgrp from current broker. |
| * |
| * Locality: main thread |
| */ |
| static void rd_kafka_cgrp_unassign_broker (rd_kafka_cgrp_t *rkcg) { |
| rd_kafka_broker_t *rkb = rkcg->rkcg_rkb; |
| |
| rd_kafka_assert(NULL, rkcg->rkcg_rkb); |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKUNASSIGN", |
| "Group \"%.*s\" management unassigned " |
| "from broker handle %s", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_kafka_broker_name(rkb)); |
| |
| rkcg->rkcg_rkb = NULL; |
| rd_kafka_broker_destroy(rkb); /* from assign() */ |
| } |
| |
| |
| /** |
| * Assign cgrp to a broker to handle. |
| * It will prefer the coordinator broker but if that is not available |
| * any other broker that is Up will be used, and if that also fails |
| * uses the internal broker handle. |
| * |
| * Returns 1 if the cgrp was reassigned, else 0. |
| */ |
| int rd_kafka_cgrp_reassign_broker (rd_kafka_cgrp_t *rkcg) { |
| rd_kafka_broker_t *rkb; |
| |
| rkb = rd_kafka_cgrp_select_broker(rkcg); |
| |
| if (rkb == rkcg->rkcg_rkb) { |
| int is_coord = 0; |
| |
| if (rkb) { |
| rd_kafka_broker_lock(rkb); |
| is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb); |
| rd_kafka_broker_unlock(rkb); |
| } |
| if (is_coord) |
| rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT); |
| else |
| rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER); |
| |
| if (rkb) |
| rd_kafka_broker_destroy(rkb); |
| return 0; /* No change */ |
| } |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKREASSIGN", |
| "Group \"%.*s\" management reassigned from " |
| "broker %s to %s", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rkcg->rkcg_rkb ? |
| rd_kafka_broker_name(rkcg->rkcg_rkb) : "(none)", |
| rkb ? rd_kafka_broker_name(rkb) : "(none)"); |
| |
| |
| if (rkcg->rkcg_rkb) |
| rd_kafka_cgrp_unassign_broker(rkcg); |
| |
| rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER); |
| |
| if (rkb) { |
| rd_kafka_cgrp_assign_broker(rkcg, rkb); |
| rd_kafka_broker_destroy(rkb); /* from select_broker() */ |
| } |
| |
| return 1; |
| } |
| |
| |
| /** |
| * Update the cgrp's coordinator and move it to that broker. |
| */ |
| void rd_kafka_cgrp_coord_update (rd_kafka_cgrp_t *rkcg, int32_t coord_id) { |
| |
| if (rkcg->rkcg_coord_id == coord_id) { |
| if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_WAIT_COORD) |
| rd_kafka_cgrp_set_state(rkcg, |
| RD_KAFKA_CGRP_STATE_WAIT_BROKER); |
| return; |
| } |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPCOORD", |
| "Group \"%.*s\" changing coordinator %"PRId32" -> %"PRId32, |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rkcg->rkcg_coord_id, |
| coord_id); |
| rkcg->rkcg_coord_id = coord_id; |
| |
| rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER); |
| |
| rd_kafka_cgrp_reassign_broker(rkcg); |
| } |
| |
| |
| |
| |
| |
| |
| /** |
| * Handle GroupCoordinator response |
| */ |
| static void rd_kafka_cgrp_handle_GroupCoordinator (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| const int log_decode_errors = LOG_ERR; |
| int16_t ErrorCode = 0; |
| int32_t CoordId; |
| rd_kafkap_str_t CoordHost = RD_ZERO_INIT; |
| int32_t CoordPort; |
| rd_kafka_cgrp_t *rkcg = opaque; |
| struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT; |
| |
| if (likely(!(ErrorCode = err))) { |
| rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
| rd_kafka_buf_read_i32(rkbuf, &CoordId); |
| rd_kafka_buf_read_str(rkbuf, &CoordHost); |
| rd_kafka_buf_read_i32(rkbuf, &CoordPort); |
| } |
| |
| if (ErrorCode) |
| goto err2; |
| |
| |
| mdb.id = CoordId; |
| RD_KAFKAP_STR_DUPA(&mdb.host, &CoordHost); |
| mdb.port = CoordPort; |
| |
| rd_rkb_dbg(rkb, CGRP, "CGRPCOORD", |
| "Group \"%.*s\" coordinator is %s:%i id %"PRId32, |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| mdb.host, mdb.port, mdb.id); |
| rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &mdb); |
| |
| rd_kafka_cgrp_coord_update(rkcg, CoordId); |
| rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */ |
| return; |
| |
| err_parse: /* Parse error */ |
| ErrorCode = rkbuf->rkbuf_err; |
| /* FALLTHRU */ |
| |
| err2: |
| rd_rkb_dbg(rkb, CGRP, "CGRPCOORD", |
| "Group \"%.*s\" GroupCoordinator response error: %s", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_kafka_err2str(ErrorCode)); |
| |
| if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY) |
| return; |
| |
| if (ErrorCode == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE) |
| rd_kafka_cgrp_coord_update(rkcg, -1); |
| else { |
| if (rkcg->rkcg_last_err != ErrorCode) { |
| rd_kafka_q_op_err(rkcg->rkcg_q, |
| RD_KAFKA_OP_CONSUMER_ERR, |
| ErrorCode, 0, NULL, 0, |
| "GroupCoordinator response error: %s", |
| rd_kafka_err2str(ErrorCode)); |
| |
| /* Suppress repeated errors */ |
| rkcg->rkcg_last_err = ErrorCode; |
| } |
| |
| /* Continue querying */ |
| rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); |
| } |
| |
| rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */ |
| } |
| |
| |
| /** |
| * Query for coordinator. |
| * Ask any broker in state UP |
| * |
| * Locality: main thread |
| */ |
| void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg, |
| const char *reason) { |
| rd_kafka_broker_t *rkb; |
| |
| rd_kafka_rdlock(rkcg->rkcg_rk); |
| rkb = rd_kafka_broker_any(rkcg->rkcg_rk, RD_KAFKA_BROKER_STATE_UP, |
| rd_kafka_broker_filter_can_group_query, NULL); |
| rd_kafka_rdunlock(rkcg->rkcg_rk); |
| |
| if (!rkb) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPQUERY", |
| "Group \"%.*s\": " |
| "no broker available for coordinator query: %s", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason); |
| return; |
| } |
| |
| rd_rkb_dbg(rkb, CGRP, "CGRPQUERY", |
| "Group \"%.*s\": querying for coordinator: %s", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason); |
| |
| rd_kafka_GroupCoordinatorRequest(rkb, rkcg->rkcg_group_id, |
| RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), |
| rd_kafka_cgrp_handle_GroupCoordinator, |
| rkcg); |
| |
| if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD) |
| rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD); |
| |
| rd_kafka_broker_destroy(rkb); |
| } |
| |
| /** |
| * @brief Mark the current coordinator as dead. |
| * |
| * @locality main thread |
| */ |
| void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err, |
| const char *reason) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORD", |
| "Group \"%.*s\": marking the coordinator dead: %s: %s", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_kafka_err2str(err), reason); |
| |
| rd_kafka_cgrp_coord_update(rkcg, -1); |
| |
| /* Re-query for coordinator */ |
| rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); |
| rd_kafka_cgrp_coord_query(rkcg, reason); |
| } |
| |
| |
| |
| static void rd_kafka_cgrp_leave (rd_kafka_cgrp_t *rkcg, int ignore_response) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE", |
| "Group \"%.*s\": leave", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); |
| |
| if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) |
| rd_kafka_LeaveGroupRequest(rkcg->rkcg_rkb, rkcg->rkcg_group_id, |
| rkcg->rkcg_member_id, |
| ignore_response ? |
| RD_KAFKA_NO_REPLYQ : |
| RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), |
| ignore_response ? NULL : |
| rd_kafka_handle_LeaveGroup, rkcg); |
| else if (!ignore_response) |
| rd_kafka_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_rkb, |
| RD_KAFKA_RESP_ERR__WAIT_COORD, |
| NULL, NULL, rkcg); |
| } |
| |
| |
| /** |
| * Enqueue a rebalance op (if configured). 'partitions' is copied. |
| * This delegates the responsibility of assign() and unassign() to the |
| * application. |
| * |
| * Returns 1 if a rebalance op was enqueued, else 0. |
| * Returns 0 if there was no rebalance_cb or 'assignment' is NULL, |
| * in which case rd_kafka_cgrp_assign(rkcg,assignment) is called immediately. |
| */ |
| static int |
| rd_kafka_rebalance_op (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_resp_err_t err, |
| rd_kafka_topic_partition_list_t *assignment, |
| const char *reason) { |
| rd_kafka_op_t *rko; |
| |
| rd_kafka_wrlock(rkcg->rkcg_rk); |
| rkcg->rkcg_c.ts_rebalance = rd_clock(); |
| rkcg->rkcg_c.rebalance_cnt++; |
| rd_kafka_wrunlock(rkcg->rkcg_rk); |
| |
| /* Pause current partition set consumers until new assign() is called */ |
| if (rkcg->rkcg_assignment) |
| rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 1, |
| RD_KAFKA_TOPPAR_F_LIB_PAUSE, |
| rkcg->rkcg_assignment); |
| |
| if (!(rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE) |
| || !assignment) { |
| no_delegation: |
| if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) |
| rd_kafka_cgrp_assign(rkcg, assignment); |
| else |
| rd_kafka_cgrp_unassign(rkcg); |
| return 0; |
| } |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN", |
| "Group \"%s\": delegating %s of %d partition(s) " |
| "to application rebalance callback on queue %s: %s", |
| rkcg->rkcg_group_id->str, |
| err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ? |
| "revoke":"assign", assignment->cnt, |
| rd_kafka_q_dest_name(rkcg->rkcg_q), reason); |
| |
| rd_kafka_cgrp_set_join_state( |
| rkcg, |
| err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ? |
| RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB : |
| RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB); |
| |
| rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE); |
| rko->rko_err = err; |
| rko->rko_u.rebalance.partitions = |
| rd_kafka_topic_partition_list_copy(assignment); |
| |
| if (rd_kafka_q_enq(rkcg->rkcg_q, rko) == 0) { |
| /* Queue disabled, handle assignment here. */ |
| goto no_delegation; |
| } |
| |
| return 1; |
| } |
| |
| |
| /** |
| * @brief Run group assignment. |
| */ |
| static void |
| rd_kafka_cgrp_assignor_run (rd_kafka_cgrp_t *rkcg, |
| const char *protocol_name, |
| rd_kafka_resp_err_t err, |
| rd_kafka_metadata_t *metadata, |
| rd_kafka_group_member_t *members, |
| int member_cnt) { |
| char errstr[512]; |
| |
| if (err) { |
| rd_snprintf(errstr, sizeof(errstr), |
| "Failed to get cluster metadata: %s", |
| rd_kafka_err2str(err)); |
| goto err; |
| } |
| |
| *errstr = '\0'; |
| |
| /* Run assignor */ |
| err = rd_kafka_assignor_run(rkcg, protocol_name, metadata, |
| members, member_cnt, |
| errstr, sizeof(errstr)); |
| |
| if (err) { |
| if (!*errstr) |
| rd_snprintf(errstr, sizeof(errstr), "%s", |
| rd_kafka_err2str(err)); |
| goto err; |
| } |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNOR", |
| "Group \"%s\": \"%s\" assignor run for %d member(s)", |
| rkcg->rkcg_group_id->str, protocol_name, member_cnt); |
| |
| rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC); |
| |
| /* Respond to broker with assignment set or error */ |
| rd_kafka_SyncGroupRequest(rkcg->rkcg_rkb, |
| rkcg->rkcg_group_id, rkcg->rkcg_generation_id, |
| rkcg->rkcg_member_id, |
| members, err ? 0 : member_cnt, |
| RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), |
| rd_kafka_handle_SyncGroup, rkcg); |
| return; |
| |
| err: |
| rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "ASSIGNOR", |
| "Group \"%s\": failed to run assignor \"%s\" for " |
| "%d member(s): %s", |
| rkcg->rkcg_group_id->str, protocol_name, |
| member_cnt, errstr); |
| |
| rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT); |
| |
| } |
| |
| |
| |
| /** |
| * @brief Op callback from handle_JoinGroup |
| */ |
| static rd_kafka_op_res_t |
| rd_kafka_cgrp_assignor_handle_Metadata_op (rd_kafka_t *rk, |
| rd_kafka_q_t *rkq, |
| rd_kafka_op_t *rko) { |
| rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; |
| |
| if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) |
| return RD_KAFKA_OP_RES_HANDLED; /* Terminating */ |
| |
| if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA) |
| return RD_KAFKA_OP_RES_HANDLED; /* From outdated state */ |
| |
| if (!rkcg->rkcg_group_leader.protocol) { |
| rd_kafka_dbg(rk, CGRP, "GRPLEADER", |
| "Group \"%.*s\": no longer leader: " |
| "not running assignor", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); |
| return RD_KAFKA_OP_RES_HANDLED; |
| } |
| |
| rd_kafka_cgrp_assignor_run(rkcg, |
| rkcg->rkcg_group_leader.protocol, |
| rko->rko_err, rko->rko_u.metadata.md, |
| rkcg->rkcg_group_leader.members, |
| rkcg->rkcg_group_leader.member_cnt); |
| |
| return RD_KAFKA_OP_RES_HANDLED; |
| } |
| |
| |
| /** |
| * Parse single JoinGroup.Members.MemberMetadata for "consumer" ProtocolType |
| * |
| * Protocol definition: |
| * https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal |
| * |
| * Returns 0 on success or -1 on error. |
| */ |
| static int |
| rd_kafka_group_MemberMetadata_consumer_read ( |
| rd_kafka_broker_t *rkb, rd_kafka_group_member_t *rkgm, |
| const rd_kafkap_str_t *GroupProtocol, |
| const rd_kafkap_bytes_t *MemberMetadata) { |
| |
| rd_kafka_buf_t *rkbuf; |
| int16_t Version; |
| int32_t subscription_cnt; |
| rd_kafkap_bytes_t UserData; |
| const int log_decode_errors = LOG_ERR; |
| rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG; |
| |
| /* Create a shadow-buffer pointing to the metadata to ease parsing. */ |
| rkbuf = rd_kafka_buf_new_shadow(MemberMetadata->data, |
| RD_KAFKAP_BYTES_LEN(MemberMetadata), |
| NULL); |
| |
| rd_kafka_buf_read_i16(rkbuf, &Version); |
| rd_kafka_buf_read_i32(rkbuf, &subscription_cnt); |
| |
| if (subscription_cnt > 10000 || subscription_cnt <= 0) |
| goto err; |
| |
| rkgm->rkgm_subscription = |
| rd_kafka_topic_partition_list_new(subscription_cnt); |
| |
| while (subscription_cnt-- > 0) { |
| rd_kafkap_str_t Topic; |
| char *topic_name; |
| rd_kafka_buf_read_str(rkbuf, &Topic); |
| RD_KAFKAP_STR_DUPA(&topic_name, &Topic); |
| rd_kafka_topic_partition_list_add(rkgm->rkgm_subscription, |
| topic_name, |
| RD_KAFKA_PARTITION_UA); |
| } |
| |
| rd_kafka_buf_read_bytes(rkbuf, &UserData); |
| rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData); |
| |
| rd_kafka_buf_destroy(rkbuf); |
| |
| return 0; |
| |
| err_parse: |
| err = rkbuf->rkbuf_err; |
| |
| err: |
| rd_rkb_dbg(rkb, CGRP, "MEMBERMETA", |
| "Failed to parse MemberMetadata for \"%.*s\": %s", |
| RD_KAFKAP_STR_PR(rkgm->rkgm_member_id), |
| rd_kafka_err2str(err)); |
| if (rkgm->rkgm_subscription) { |
| rd_kafka_topic_partition_list_destroy(rkgm-> |
| rkgm_subscription); |
| rkgm->rkgm_subscription = NULL; |
| } |
| |
| rd_kafka_buf_destroy(rkbuf); |
| return -1; |
| } |
| |
| |
| |
| |
| /** |
| * @brief cgrp handler for JoinGroup responses |
| * opaque must be the cgrp handle. |
| * |
| * @locality cgrp broker thread |
| */ |
| static void rd_kafka_cgrp_handle_JoinGroup (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| rd_kafka_cgrp_t *rkcg = opaque; |
| const int log_decode_errors = LOG_ERR; |
| int16_t ErrorCode = 0; |
| int32_t GenerationId; |
| rd_kafkap_str_t Protocol, LeaderId, MyMemberId; |
| int32_t member_cnt; |
| int actions; |
| int i_am_leader = 0; |
| |
| if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN) { |
| rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP", |
| "JoinGroup response: discarding outdated request " |
| "(now in join-state %s)", |
| rd_kafka_cgrp_join_state_names[rkcg-> |
| rkcg_join_state]); |
| return; |
| } |
| |
| if (err) { |
| ErrorCode = err; |
| goto err; |
| } |
| |
| rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
| rd_kafka_buf_read_i32(rkbuf, &GenerationId); |
| rd_kafka_buf_read_str(rkbuf, &Protocol); |
| rd_kafka_buf_read_str(rkbuf, &LeaderId); |
| rd_kafka_buf_read_str(rkbuf, &MyMemberId); |
| rd_kafka_buf_read_i32(rkbuf, &member_cnt); |
| |
| if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) { |
| /* Protocol not set, we will not be able to find |
| * a matching assignor so error out early. */ |
| ErrorCode = RD_KAFKA_RESP_ERR__BAD_MSG; |
| } |
| |
| rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP", |
| "JoinGroup response: GenerationId %"PRId32", " |
| "Protocol %.*s, LeaderId %.*s%s, my MemberId %.*s, " |
| "%"PRId32" members in group: %s", |
| GenerationId, |
| RD_KAFKAP_STR_PR(&Protocol), |
| RD_KAFKAP_STR_PR(&LeaderId), |
| !rd_kafkap_str_cmp(&LeaderId, &MyMemberId) ? " (me)" : "", |
| RD_KAFKAP_STR_PR(&MyMemberId), |
| member_cnt, |
| ErrorCode ? rd_kafka_err2str(ErrorCode) : "(no error)"); |
| |
| if (!ErrorCode) { |
| char *my_member_id; |
| RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId); |
| rkcg->rkcg_generation_id = GenerationId; |
| rd_kafka_cgrp_set_member_id(rkcg, my_member_id); |
| i_am_leader = !rd_kafkap_str_cmp(&LeaderId, &MyMemberId); |
| } else { |
| rd_interval_backoff(&rkcg->rkcg_join_intvl, 1000*1000); |
| goto err; |
| } |
| |
| if (i_am_leader) { |
| rd_kafka_group_member_t *members; |
| int i; |
| int sub_cnt = 0; |
| rd_list_t topics; |
| rd_kafka_op_t *rko; |
| rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP", |
| "Elected leader for group \"%s\" " |
| "with %"PRId32" member(s)", |
| rkcg->rkcg_group_id->str, member_cnt); |
| |
| if (member_cnt > 100000) { |
| err = RD_KAFKA_RESP_ERR__BAD_MSG; |
| goto err; |
| } |
| |
| rd_list_init(&topics, member_cnt, rd_free); |
| |
| members = rd_calloc(member_cnt, sizeof(*members)); |
| |
| for (i = 0 ; i < member_cnt ; i++) { |
| rd_kafkap_str_t MemberId; |
| rd_kafkap_bytes_t MemberMetadata; |
| rd_kafka_group_member_t *rkgm; |
| |
| rd_kafka_buf_read_str(rkbuf, &MemberId); |
| rd_kafka_buf_read_bytes(rkbuf, &MemberMetadata); |
| |
| rkgm = &members[sub_cnt]; |
| rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId); |
| rd_list_init(&rkgm->rkgm_eligible, 0, NULL); |
| |
| if (rd_kafka_group_MemberMetadata_consumer_read( |
| rkb, rkgm, &Protocol, &MemberMetadata)) { |
| /* Failed to parse this member's metadata, |
| * ignore it. */ |
| } else { |
| sub_cnt++; |
| rkgm->rkgm_assignment = |
| rd_kafka_topic_partition_list_new( |
| rkgm->rkgm_subscription->size); |
| rd_kafka_topic_partition_list_get_topic_names( |
| rkgm->rkgm_subscription, &topics, |
| 0/*dont include regex*/); |
| } |
| |
| } |
| |
| /* FIXME: What to do if parsing failed for some/all members? |
| * It is a sign of incompatibility. */ |
| |
| |
| rd_kafka_cgrp_group_leader_reset(rkcg, |
| "JoinGroup response clean-up"); |
| |
| rkcg->rkcg_group_leader.protocol = RD_KAFKAP_STR_DUP(&Protocol); |
| rd_kafka_assert(NULL, rkcg->rkcg_group_leader.members == NULL); |
| rkcg->rkcg_group_leader.members = members; |
| rkcg->rkcg_group_leader.member_cnt = sub_cnt; |
| |
| rd_kafka_cgrp_set_join_state( |
| rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA); |
| |
| /* The assignor will need metadata so fetch it asynchronously |
| * and run the assignor when we get a reply. |
| * Create a callback op that the generic metadata code |
| * will trigger when metadata has been parsed. */ |
| rko = rd_kafka_op_new_cb( |
| rkcg->rkcg_rk, RD_KAFKA_OP_METADATA, |
| rd_kafka_cgrp_assignor_handle_Metadata_op); |
| rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL); |
| |
| rd_kafka_MetadataRequest(rkb, &topics, |
| "partition assignor", rko); |
| rd_list_destroy(&topics); |
| |
| } else { |
| rd_kafka_cgrp_set_join_state( |
| rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC); |
| |
| rd_kafka_SyncGroupRequest(rkb, rkcg->rkcg_group_id, |
| rkcg->rkcg_generation_id, |
| rkcg->rkcg_member_id, |
| NULL, 0, |
| RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), |
| rd_kafka_handle_SyncGroup, rkcg); |
| |
| } |
| |
| err: |
| actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request, |
| RD_KAFKA_ERR_ACTION_IGNORE, |
| RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, |
| |
| RD_KAFKA_ERR_ACTION_END); |
| |
| if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
| /* Re-query for coordinator */ |
| rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, |
| RD_KAFKA_OP_COORD_QUERY, ErrorCode); |
| } |
| |
| if (ErrorCode) { |
| if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY) |
| return; /* Termination */ |
| |
| if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) |
| rd_kafka_q_op_err(rkcg->rkcg_q, |
| RD_KAFKA_OP_CONSUMER_ERR, |
| ErrorCode, 0, NULL, 0, |
| "JoinGroup failed: %s", |
| rd_kafka_err2str(ErrorCode)); |
| |
| if (ErrorCode == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID) |
| rd_kafka_cgrp_set_member_id(rkcg, ""); |
| rd_kafka_cgrp_set_join_state(rkcg, |
| RD_KAFKA_CGRP_JOIN_STATE_INIT); |
| } |
| |
| return; |
| |
| err_parse: |
| ErrorCode = rkbuf->rkbuf_err; |
| goto err; |
| } |
| |
| |
| /** |
| * @brief Check subscription against requested Metadata. |
| */ |
| static rd_kafka_op_res_t |
| rd_kafka_cgrp_handle_Metadata_op (rd_kafka_t *rk, rd_kafka_q_t *rkq, |
| rd_kafka_op_t *rko) { |
| rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; |
| |
| if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) |
| return RD_KAFKA_OP_RES_HANDLED; /* Terminating */ |
| |
| rd_kafka_cgrp_metadata_update_check(rkcg, 0/*dont rejoin*/); |
| |
| return RD_KAFKA_OP_RES_HANDLED; |
| } |
| |
| |
| /** |
| * @brief (Async) Refresh metadata (for cgrp's needs) |
| * |
| * @returns 1 if metadata refresh was requested, or 0 if metadata is |
| * up to date, or -1 if no broker is available for metadata requests. |
| * |
| * @locks none |
| * @locality rdkafka main thread |
| */ |
| static int rd_kafka_cgrp_metadata_refresh (rd_kafka_cgrp_t *rkcg, |
| int *metadata_agep, |
| const char *reason) { |
| rd_kafka_t *rk = rkcg->rkcg_rk; |
| rd_kafka_op_t *rko; |
| rd_list_t topics; |
| rd_kafka_resp_err_t err; |
| |
| rd_list_init(&topics, 8, rd_free); |
| |
| /* Insert all non-wildcard topics in cache. */ |
| rd_kafka_metadata_cache_hint_rktparlist(rkcg->rkcg_rk, |
| rkcg->rkcg_subscription, |
| NULL, 0/*dont replace*/); |
| |
| if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) { |
| /* For wildcard subscriptions make sure the |
| * cached full metadata isn't too old. */ |
| int metadata_age = -1; |
| |
| if (rk->rk_ts_full_metadata) |
| metadata_age = (int)(rd_clock() - |
| rk->rk_ts_full_metadata)/1000; |
| |
| *metadata_agep = metadata_age; |
| |
| if (metadata_age != -1 && |
| metadata_age <= |
| /* The +1000 is since metadata.refresh.interval.ms |
| * can be set to 0. */ |
| rk->rk_conf.metadata_refresh_interval_ms + 1000) { |
| rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA, |
| "CGRPMETADATA", |
| "%s: metadata for wildcard subscription " |
| "is up to date (%dms old)", |
| reason, *metadata_agep); |
| rd_list_destroy(&topics); |
| return 0; /* Up-to-date */ |
| } |
| |
| } else { |
| /* Check that all subscribed topics are in the cache. */ |
| int r; |
| |
| rd_kafka_topic_partition_list_get_topic_names( |
| rkcg->rkcg_subscription, &topics, 0/*no regexps*/); |
| |
| rd_kafka_rdlock(rk); |
| r = rd_kafka_metadata_cache_topics_count_exists(rk, &topics, |
| metadata_agep); |
| rd_kafka_rdunlock(rk); |
| |
| if (r == rd_list_cnt(&topics)) { |
| rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA, |
| "CGRPMETADATA", |
| "%s: metadata for subscription " |
| "is up to date (%dms old)", reason, |
| *metadata_agep); |
| rd_list_destroy(&topics); |
| return 0; /* Up-to-date and all topics exist. */ |
| } |
| |
| rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA, |
| "CGRPMETADATA", |
| "%s: metadata for subscription " |
| "only available for %d/%d topics (%dms old)", |
| reason, r, rd_list_cnt(&topics), *metadata_agep); |
| |
| } |
| |
| /* Async request, result will be triggered from |
| * rd_kafka_parse_metadata(). */ |
| rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA, |
| rd_kafka_cgrp_handle_Metadata_op); |
| rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, 0); |
| |
| err = rd_kafka_metadata_request(rkcg->rkcg_rk, NULL, &topics, |
| reason, rko); |
| if (err) { |
| rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA, |
| "CGRPMETADATA", |
| "%s: need to refresh metadata (%dms old) " |
| "but no usable brokers available: %s", |
| reason, *metadata_agep, rd_kafka_err2str(err)); |
| rd_kafka_op_destroy(rko); |
| } |
| |
| rd_list_destroy(&topics); |
| |
| return err ? -1 : 1; |
| } |
| |
| |
| |
| static void rd_kafka_cgrp_join (rd_kafka_cgrp_t *rkcg) { |
| int metadata_age; |
| |
| if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || |
| rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_INIT) |
| return; |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN", |
| "Group \"%.*s\": join with %d (%d) subscribed topic(s)", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_list_cnt(rkcg->rkcg_subscribed_topics), |
| rkcg->rkcg_subscription->cnt); |
| |
| |
| /* See if we need to query metadata to continue: |
| * - if subscription contains wildcards: |
| * * query all topics in cluster |
| * |
| * - if subscription does not contain wildcards but |
| * some topics are missing from the local metadata cache: |
| * * query subscribed topics (all cached ones) |
| * |
| * - otherwise: |
| * * rely on topic metadata cache |
| */ |
| /* We need up-to-date full metadata to continue, |
| * refresh metadata if necessary. */ |
| if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age, |
| "consumer join") == 1) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN", |
| "Group \"%.*s\": " |
| "postponing join until up-to-date " |
| "metadata is available", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); |
| return; /* ^ async call */ |
| } |
| |
| if (rd_list_empty(rkcg->rkcg_subscribed_topics)) |
| rd_kafka_cgrp_metadata_update_check(rkcg, 0/*dont join*/); |
| |
| if (rd_list_empty(rkcg->rkcg_subscribed_topics)) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN", |
| "Group \"%.*s\": " |
| "no matching topics based on %dms old metadata: " |
| "next metadata refresh in %dms", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| metadata_age, |
| rkcg->rkcg_rk->rk_conf. |
| metadata_refresh_interval_ms - metadata_age); |
| return; |
| } |
| |
| rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN); |
| rd_kafka_JoinGroupRequest(rkcg->rkcg_rkb, rkcg->rkcg_group_id, |
| rkcg->rkcg_member_id, |
| rkcg->rkcg_rk->rk_conf.group_protocol_type, |
| rkcg->rkcg_subscribed_topics, |
| RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), |
| rd_kafka_cgrp_handle_JoinGroup, rkcg); |
| } |
| |
| /** |
| * Rejoin group on update to effective subscribed topics list |
| */ |
| static void rd_kafka_cgrp_rejoin (rd_kafka_cgrp_t *rkcg) { |
| /* |
| * Clean-up group leader duties, if any. |
| */ |
| rd_kafka_cgrp_group_leader_reset(rkcg, "Group rejoin"); |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "REJOIN", |
| "Group \"%.*s\" rejoining in join-state %s " |
| "with%s an assignment", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], |
| rkcg->rkcg_assignment ? "" : "out"); |
| |
| /* Remove assignment (async), if any. If there is already an |
| * unassign in progress we dont need to bother. */ |
| if (rkcg->rkcg_assignment) { |
| if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) { |
| rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_UNASSIGN; |
| |
| rd_kafka_rebalance_op( |
| rkcg, |
| RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, |
| rkcg->rkcg_assignment, "unsubscribe"); |
| } |
| } else { |
| rd_kafka_cgrp_set_join_state(rkcg, |
| RD_KAFKA_CGRP_JOIN_STATE_INIT); |
| rd_kafka_cgrp_join(rkcg); |
| } |
| } |
| |
| /** |
| * Update the effective list of subscribed topics and trigger a rejoin |
| * if it changed. |
| * |
| * Set \p tinfos to NULL for clearing the list. |
| * |
| * @param tinfos rd_list_t(rd_kafka_topic_info_t *): new effective topic list |
| * |
| * @returns 1 on change, else 0. |
| * |
| * @remark Takes ownership of \p tinfos |
| */ |
| static int |
| rd_kafka_cgrp_update_subscribed_topics (rd_kafka_cgrp_t *rkcg, |
| rd_list_t *tinfos) { |
| rd_kafka_topic_info_t *tinfo; |
| int i; |
| |
| if (!tinfos) { |
| if (!rd_list_empty(rkcg->rkcg_subscribed_topics)) |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION", |
| "Group \"%.*s\": " |
| "clearing subscribed topics list (%d)", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_list_cnt(rkcg->rkcg_subscribed_topics)); |
| tinfos = rd_list_new(0, (void *)rd_kafka_topic_info_destroy); |
| |
| } else { |
| if (rd_list_cnt(tinfos) == 0) |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION", |
| "Group \"%.*s\": " |
| "no topics in metadata matched " |
| "subscription", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); |
| } |
| |
| /* Sort for comparison */ |
| rd_list_sort(tinfos, rd_kafka_topic_info_cmp); |
| |
| /* Compare to existing to see if anything changed. */ |
| if (!rd_list_cmp(rkcg->rkcg_subscribed_topics, tinfos, |
| rd_kafka_topic_info_cmp)) { |
| /* No change */ |
| rd_list_destroy(tinfos); |
| return 0; |
| } |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA, "SUBSCRIPTION", |
| "Group \"%.*s\": effective subscription list changed " |
| "from %d to %d topic(s):", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_list_cnt(rkcg->rkcg_subscribed_topics), |
| rd_list_cnt(tinfos)); |
| |
| RD_LIST_FOREACH(tinfo, tinfos, i) |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA, |
| "SUBSCRIPTION", |
| " Topic %s with %d partition(s)", |
| tinfo->topic, tinfo->partition_cnt); |
| |
| rd_list_destroy(rkcg->rkcg_subscribed_topics); |
| |
| rkcg->rkcg_subscribed_topics = tinfos; |
| |
| return 1; |
| } |
| |
| |
| |
| /** |
| * @brief Handle heart Heartbeat response. |
| */ |
| void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; |
| const int log_decode_errors = LOG_ERR; |
| int16_t ErrorCode = 0; |
| int actions; |
| |
| if (err) { |
| ErrorCode = err; |
| goto err; |
| } |
| |
| rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
| |
| err: |
| actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request, |
| RD_KAFKA_ERR_ACTION_END); |
| |
| rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT); |
| rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; |
| |
| if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
| /* Re-query for coordinator */ |
| rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, |
| RD_KAFKA_OP_COORD_QUERY, ErrorCode); |
| /* Schedule a retry */ |
| if (ErrorCode != RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP) { |
| rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; |
| rd_kafka_buf_keep(request); |
| rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; |
| rd_kafka_broker_buf_retry(request->rkbuf_rkb, request); |
| } |
| return; |
| } |
| |
| if (ErrorCode != 0 && ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) |
| rd_kafka_cgrp_handle_heartbeat_error(rkcg, ErrorCode); |
| |
| return; |
| |
| err_parse: |
| ErrorCode = rkbuf->rkbuf_err; |
| goto err; |
| } |
| |
| |
| |
| /** |
| * @brief Send Heartbeat |
| */ |
| static void rd_kafka_cgrp_heartbeat (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_broker_t *rkb) { |
| /* Skip heartbeat if we have one in transit */ |
| if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT) |
| return; |
| |
| rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; |
| rd_kafka_HeartbeatRequest(rkb, rkcg->rkcg_group_id, |
| rkcg->rkcg_generation_id, |
| rkcg->rkcg_member_id, |
| RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), |
| rd_kafka_cgrp_handle_Heartbeat, NULL); |
| } |
| |
| /** |
| * Cgrp is now terminated: decommission it and signal back to application. |
| */ |
| static void rd_kafka_cgrp_terminated (rd_kafka_cgrp_t *rkcg) { |
| |
| rd_kafka_assert(NULL, rkcg->rkcg_wait_unassign_cnt == 0); |
| rd_kafka_assert(NULL, rkcg->rkcg_wait_commit_cnt == 0); |
| rd_kafka_assert(NULL, !(rkcg->rkcg_flags&RD_KAFKA_CGRP_F_WAIT_UNASSIGN)); |
| rd_kafka_assert(NULL, rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM); |
| |
| rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers, |
| &rkcg->rkcg_offset_commit_tmr, 1/*lock*/); |
| |
| rd_kafka_q_purge(rkcg->rkcg_wait_coord_q); |
| |
| /* Disable and empty ops queue since there will be no |
| * (broker) thread serving it anymore after the unassign_broker |
| * below. |
| * This prevents hang on destroy where responses are enqueued on rkcg_ops |
| * without anything serving the queue. */ |
| rd_kafka_q_disable(rkcg->rkcg_ops); |
| rd_kafka_q_purge(rkcg->rkcg_ops); |
| |
| if (rkcg->rkcg_rkb) |
| rd_kafka_cgrp_unassign_broker(rkcg); |
| |
| if (rkcg->rkcg_reply_rko) { |
| /* Signal back to application. */ |
| rd_kafka_replyq_enq(&rkcg->rkcg_reply_rko->rko_replyq, |
| rkcg->rkcg_reply_rko, 0); |
| rkcg->rkcg_reply_rko = NULL; |
| } |
| } |
| |
| |
| /** |
| * If a cgrp is terminating and all outstanding ops are now finished |
| * then progress to final termination and return 1. |
| * Else returns 0. |
| */ |
| static RD_INLINE int rd_kafka_cgrp_try_terminate (rd_kafka_cgrp_t *rkcg) { |
| |
| if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM) |
| return 1; |
| |
| if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE))) |
| return 0; |
| |
| /* Check if wait-coord queue has timed out. */ |
| if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 && |
| rkcg->rkcg_ts_terminate + |
| (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) < |
| rd_clock()) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM", |
| "Group \"%s\": timing out %d op(s) in " |
| "wait-for-coordinator queue", |
| rkcg->rkcg_group_id->str, |
| rd_kafka_q_len(rkcg->rkcg_wait_coord_q)); |
| rd_kafka_q_disable(rkcg->rkcg_wait_coord_q); |
| if (rd_kafka_q_concat(rkcg->rkcg_ops, |
| rkcg->rkcg_wait_coord_q) == -1) { |
| /* ops queue shut down, purge coord queue */ |
| rd_kafka_q_purge(rkcg->rkcg_wait_coord_q); |
| } |
| } |
| |
| if (!RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) && |
| rd_list_empty(&rkcg->rkcg_toppars) && |
| rkcg->rkcg_wait_unassign_cnt == 0 && |
| rkcg->rkcg_wait_commit_cnt == 0 && |
| !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) { |
| /* Since we might be deep down in a 'rko' handler |
| * called from cgrp_op_serve() we cant call terminated() |
| * directly since it will decommission the rkcg_ops queue |
| * that might be locked by intermediate functions. |
| * Instead set the TERM state and let the cgrp terminate |
| * at its own discretion. */ |
| rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_TERM); |
| return 1; |
| } else { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM", |
| "Group \"%s\": " |
| "waiting for %s%d toppar(s), %d unassignment(s), " |
| "%d commit(s)%s (state %s, join-state %s) " |
| "before terminating", |
| rkcg->rkcg_group_id->str, |
| RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) ? |
| "rebalance_cb, ": "", |
| rd_list_cnt(&rkcg->rkcg_toppars), |
| rkcg->rkcg_wait_unassign_cnt, |
| rkcg->rkcg_wait_commit_cnt, |
| (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)? |
| ", wait-unassign flag," : "", |
| rd_kafka_cgrp_state_names[rkcg->rkcg_state], |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); |
| return 0; |
| } |
| } |
| |
| |
| /** |
| * Add partition to this cgrp management |
| */ |
| static void rd_kafka_cgrp_partition_add (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_toppar_t *rktp) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP,"PARTADD", |
| "Group \"%s\": add %s [%"PRId32"]", |
| rkcg->rkcg_group_id->str, |
| rktp->rktp_rkt->rkt_topic->str, |
| rktp->rktp_partition); |
| |
| rd_kafka_assert(rkcg->rkcg_rk, !rktp->rktp_s_for_cgrp); |
| rktp->rktp_s_for_cgrp = rd_kafka_toppar_keep(rktp); |
| rd_list_add(&rkcg->rkcg_toppars, rktp->rktp_s_for_cgrp); |
| } |
| |
| /** |
| * Remove partition from this cgrp management |
| */ |
| static void rd_kafka_cgrp_partition_del (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_toppar_t *rktp) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", |
| "Group \"%s\": delete %s [%"PRId32"]", |
| rkcg->rkcg_group_id->str, |
| rktp->rktp_rkt->rkt_topic->str, |
| rktp->rktp_partition); |
| rd_kafka_assert(rkcg->rkcg_rk, rktp->rktp_s_for_cgrp); |
| |
| rd_list_remove(&rkcg->rkcg_toppars, rktp->rktp_s_for_cgrp); |
| rd_kafka_toppar_destroy(rktp->rktp_s_for_cgrp); |
| rktp->rktp_s_for_cgrp = NULL; |
| |
| rd_kafka_cgrp_try_terminate(rkcg); |
| } |
| |
| |
| |
| /** |
| * Reply for OffsetFetch from call below. |
| */ |
| static void rd_kafka_cgrp_offsets_fetch_response ( |
| rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *reply, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| rd_kafka_topic_partition_list_t *offsets = opaque; |
| rd_kafka_cgrp_t *rkcg; |
| |
| if (err == RD_KAFKA_RESP_ERR__DESTROY) { |
| /* Termination, quick cleanup. */ |
| rd_kafka_topic_partition_list_destroy(offsets); |
| return; |
| } |
| |
| rkcg = rd_kafka_cgrp_get(rk); |
| |
| if (rd_kafka_buf_version_outdated(request, rkcg->rkcg_version)) { |
| rd_kafka_topic_partition_list_destroy(offsets); |
| return; |
| } |
| |
| rd_kafka_topic_partition_list_log(rk, "OFFSETFETCH", offsets); |
| /* If all partitions already had usable offsets then there |
| * was no request sent and thus no reply, the offsets list is |
| * good to go. */ |
| if (reply) |
| err = rd_kafka_handle_OffsetFetch(rk, rkb, err, |
| reply, request, offsets, |
| 1/* Update toppars */); |
| if (err) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET", |
| "Offset fetch error: %s", |
| rd_kafka_err2str(err)); |
| |
| if (err != RD_KAFKA_RESP_ERR__WAIT_COORD) |
| rd_kafka_q_op_err(rkcg->rkcg_q, |
| RD_KAFKA_OP_CONSUMER_ERR, err, 0, |
| NULL, 0, |
| "Failed to fetch offsets: %s", |
| rd_kafka_err2str(err)); |
| } else { |
| if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg)) |
| rd_kafka_cgrp_partitions_fetch_start( |
| rkcg, offsets, 1 /* usable offsets */); |
| else |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET", |
| "Group \"%.*s\": " |
| "ignoring Offset fetch response for " |
| "%d partition(s): in state %s", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| offsets ? offsets->cnt : -1, |
| rd_kafka_cgrp_join_state_names[ |
| rkcg->rkcg_join_state]); |
| } |
| |
| rd_kafka_topic_partition_list_destroy(offsets); |
| } |
| |
| /** |
| * Fetch offsets for a list of partitions |
| */ |
| static void |
| rd_kafka_cgrp_offsets_fetch (rd_kafka_cgrp_t *rkcg, rd_kafka_broker_t *rkb, |
| rd_kafka_topic_partition_list_t *offsets) { |
| rd_kafka_topic_partition_list_t *use_offsets; |
| |
| /* Make a copy of the offsets */ |
| use_offsets = rd_kafka_topic_partition_list_copy(offsets); |
| |
| if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkb) |
| rd_kafka_cgrp_offsets_fetch_response( |
| rkcg->rkcg_rk, rkb, RD_KAFKA_RESP_ERR__WAIT_COORD, |
| NULL, NULL, use_offsets); |
| else { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET", |
| "Fetch %d offsets with v%d", |
| use_offsets->cnt, rkcg->rkcg_version); |
| rd_kafka_OffsetFetchRequest( |
| rkb, 1, offsets, |
| RD_KAFKA_REPLYQ(rkcg->rkcg_ops, rkcg->rkcg_version), |
| rd_kafka_cgrp_offsets_fetch_response, |
| use_offsets); |
| } |
| |
| } |
| |
| |
| /** |
| * Start fetching all partitions in 'assignment' (async) |
| */ |
| static void |
| rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_topic_partition_list_t |
| *assignment, int usable_offsets, |
| int line) { |
| int i; |
| |
| /* If waiting for offsets to commit we need that to finish first |
| * before starting fetchers (which might fetch those stored offsets).*/ |
| if (rkcg->rkcg_wait_commit_cnt > 0) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "FETCHSTART", |
| "Group \"%s\": not starting fetchers " |
| "for %d assigned partition(s) in join-state %s " |
| "(usable_offsets=%s, v%"PRId32", line %d): " |
| "waiting for %d commit(s)", |
| rkcg->rkcg_group_id->str, assignment->cnt, |
| rd_kafka_cgrp_join_state_names[rkcg-> |
| rkcg_join_state], |
| usable_offsets ? "yes":"no", |
| rkcg->rkcg_version, line, |
| rkcg->rkcg_wait_commit_cnt); |
| return; |
| } |
| |
| rd_kafka_cgrp_version_new_barrier(rkcg); |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "FETCHSTART", |
| "Group \"%s\": starting fetchers for %d assigned " |
| "partition(s) in join-state %s " |
| "(usable_offsets=%s, v%"PRId32", line %d)", |
| rkcg->rkcg_group_id->str, assignment->cnt, |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], |
| usable_offsets ? "yes":"no", |
| rkcg->rkcg_version, line); |
| |
| rd_kafka_topic_partition_list_log(rkcg->rkcg_rk, |
| "FETCHSTART", assignment); |
| |
| if (assignment->cnt == 0) |
| return; |
| |
| /* Check if offsets are really unusable, this is to catch the |
| * case where the entire assignment has absolute offsets set which |
| * should make us skip offset lookups. */ |
| if (!usable_offsets) |
| usable_offsets = |
| rd_kafka_topic_partition_list_count_abs_offsets( |
| assignment) == assignment->cnt; |
| |
| if (!usable_offsets && |
| rkcg->rkcg_rk->rk_conf.offset_store_method == |
| RD_KAFKA_OFFSET_METHOD_BROKER) { |
| |
| /* Fetch offsets for all assigned partitions */ |
| rd_kafka_cgrp_offsets_fetch(rkcg, rkcg->rkcg_rkb, assignment); |
| |
| } else { |
| rd_kafka_cgrp_set_join_state(rkcg, |
| RD_KAFKA_CGRP_JOIN_STATE_STARTED); |
| |
| for (i = 0 ; i < assignment->cnt ; i++) { |
| rd_kafka_topic_partition_t *rktpar = |
| &assignment->elems[i]; |
| shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private; |
| rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
| |
| if (!rktp->rktp_assigned) { |
| rktp->rktp_assigned = 1; |
| rkcg->rkcg_assigned_cnt++; |
| |
| /* Start fetcher for partition and |
| * forward partition's fetchq to |
| * consumer groups queue. */ |
| rd_kafka_toppar_op_fetch_start( |
| rktp, rktpar->offset, |
| rkcg->rkcg_q, RD_KAFKA_NO_REPLYQ); |
| } else { |
| int64_t offset; |
| /* Fetcher already started, |
| * just do seek to update offset */ |
| rd_kafka_toppar_lock(rktp); |
| if (rktpar->offset < rktp->rktp_app_offset) |
| offset = rktp->rktp_app_offset; |
| else |
| offset = rktpar->offset; |
| rd_kafka_toppar_unlock(rktp); |
| rd_kafka_toppar_op_seek(rktp, offset, |
| RD_KAFKA_NO_REPLYQ); |
| } |
| } |
| } |
| |
| rd_kafka_assert(NULL, rkcg->rkcg_assigned_cnt <= |
| (rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0)); |
| } |
| |
| |
| |
| |
| |
| /** |
| * @brief Defer offset commit (rko) until coordinator is available. |
| * |
| * @returns 1 if the rko was deferred or 0 if the defer queue is disabled |
| * or rko already deferred. |
| */ |
| static int rd_kafka_cgrp_defer_offset_commit (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_op_t *rko, |
| const char *reason) { |
| |
| /* wait_coord_q is disabled session.timeout.ms after |
| * group close() has been initated. */ |
| if (rko->rko_u.offset_commit.ts_timeout != 0 || |
| !rd_kafka_q_ready(rkcg->rkcg_wait_coord_q)) |
| return 0; |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT", |
| "Group \"%s\": " |
| "unable to OffsetCommit in state %s: %s: " |
| "coordinator (%s) is unavailable: " |
| "retrying later", |
| rkcg->rkcg_group_id->str, |
| rd_kafka_cgrp_state_names[rkcg->rkcg_state], |
| reason, |
| rkcg->rkcg_rkb ? |
| rd_kafka_broker_name(rkcg->rkcg_rkb) : |
| "none"); |
| |
| rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS; |
| rko->rko_u.offset_commit.ts_timeout = rd_clock() + |
| (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms |
| * 1000); |
| rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko); |
| |
| return 1; |
| } |
| |
| |
| /** |
| * @brief Handler of OffsetCommit response (after parsing). |
| * @remark \p offsets may be NULL if \p err is set |
| * @returns the number of partitions with errors encountered |
| */ |
| static int |
| rd_kafka_cgrp_handle_OffsetCommit (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_resp_err_t err, |
| rd_kafka_topic_partition_list_t |
| *offsets) { |
| int i; |
| int errcnt = 0; |
| |
| if (!err) { |
| /* Update toppars' committed offset */ |
| for (i = 0 ; i < offsets->cnt ; i++) { |
| rd_kafka_topic_partition_t *rktpar =&offsets->elems[i]; |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_kafka_toppar_t *rktp; |
| |
| if (unlikely(rktpar->err)) { |
| rd_kafka_dbg(rkcg->rkcg_rk, TOPIC, |
| "OFFSET", |
| "OffsetCommit failed for " |
| "%s [%"PRId32"] at offset " |
| "%"PRId64": %s", |
| rktpar->topic, rktpar->partition, |
| rktpar->offset, |
| rd_kafka_err2str(rktpar->err)); |
| errcnt++; |
| continue; |
| } else if (unlikely(rktpar->offset < 0)) |
| continue; |
| |
| s_rktp = rd_kafka_topic_partition_list_get_toppar( |
| rkcg->rkcg_rk, rktpar); |
| if (!s_rktp) |
| continue; |
| |
| rktp = rd_kafka_toppar_s2i(s_rktp); |
| rd_kafka_toppar_lock(rktp); |
| rktp->rktp_committed_offset = rktpar->offset; |
| rd_kafka_toppar_unlock(rktp); |
| |
| rd_kafka_toppar_destroy(s_rktp); |
| } |
| } |
| |
| if (rd_kafka_cgrp_try_terminate(rkcg)) |
| return errcnt; /* terminated */ |
| |
| if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN) |
| rd_kafka_cgrp_check_unassign_done(rkcg, |
| "OffsetCommit done"); |
| |
| return errcnt; |
| } |
| |
| |
| |
| |
| /** |
| * Handle OffsetCommitResponse |
| * Takes the original 'rko' as opaque argument. |
| * @remark \p rkb, rkbuf, and request may be NULL in a number of |
| * error cases (e.g., _NO_OFFSET, _WAIT_COORD) |
| */ |
| static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; |
| rd_kafka_op_t *rko_orig = opaque; |
| rd_kafka_topic_partition_list_t *offsets = |
| rko_orig->rko_u.offset_commit.partitions; /* maybe NULL */ |
| int errcnt; |
| int offset_commit_cb_served = 0; |
| |
| RD_KAFKA_OP_TYPE_ASSERT(rko_orig, RD_KAFKA_OP_OFFSET_COMMIT); |
| |
| if (rd_kafka_buf_version_outdated(request, rkcg->rkcg_version)) |
| err = RD_KAFKA_RESP_ERR__DESTROY; |
| |
| err = rd_kafka_handle_OffsetCommit(rk, rkb, err, rkbuf, |
| request, offsets); |
| |
| if (rkb) |
| rd_rkb_dbg(rkb, CGRP, "COMMIT", |
| "OffsetCommit for %d partition(s): %s: returned: %s", |
| offsets ? offsets->cnt : -1, |
| rko_orig->rko_u.offset_commit.reason, |
| rd_kafka_err2str(err)); |
| else |
| rd_kafka_dbg(rk, CGRP, "COMMIT", |
| "OffsetCommit for %d partition(s): %s: returned: %s", |
| offsets ? offsets->cnt : -1, |
| rko_orig->rko_u.offset_commit.reason, |
| rd_kafka_err2str(err)); |
| |
| if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) |
| return; /* Retrying */ |
| else if (err == RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP || |
| err == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE) { |
| |
| /* future-proofing, see timeout_scan(). */ |
| rd_kafka_assert(NULL, err != RD_KAFKA_RESP_ERR__WAIT_COORD); |
| |
| if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko_orig, |
| rd_kafka_err2str(err))) |
| return; |
| |
| /* FALLTHRU and error out */ |
| } |
| |
| rd_kafka_assert(NULL, rkcg->rkcg_wait_commit_cnt > 0); |
| rkcg->rkcg_wait_commit_cnt--; |
| |
| if (err == RD_KAFKA_RESP_ERR__DESTROY || |
| (err == RD_KAFKA_RESP_ERR__NO_OFFSET && |
| rko_orig->rko_u.offset_commit.silent_empty)) { |
| rd_kafka_op_destroy(rko_orig); |
| rd_kafka_cgrp_check_unassign_done( |
| rkcg, |
| err == RD_KAFKA_RESP_ERR__DESTROY ? |
| "OffsetCommit done (__DESTROY)" : |
| "OffsetCommit done (__NO_OFFSET)"); |
| return; |
| } |
| |
| /* Call on_commit interceptors */ |
| if (err != RD_KAFKA_RESP_ERR__NO_OFFSET && |
| err != RD_KAFKA_RESP_ERR__DESTROY && |
| offsets && offsets->cnt > 0) |
| rd_kafka_interceptors_on_commit(rk, offsets, err); |
| |
| |
| /* If no special callback is set but a offset_commit_cb has |
| * been set in conf then post an event for the latter. */ |
| if (!rko_orig->rko_u.offset_commit.cb && rk->rk_conf.offset_commit_cb) { |
| rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err); |
| |
| rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH); |
| |
| if (offsets) |
| rko_reply->rko_u.offset_commit.partitions = |
| rd_kafka_topic_partition_list_copy(offsets); |
| |
| rko_reply->rko_u.offset_commit.cb = |
| rk->rk_conf.offset_commit_cb; |
| rko_reply->rko_u.offset_commit.opaque = rk->rk_conf.opaque; |
| |
| rd_kafka_q_enq(rk->rk_rep, rko_reply); |
| offset_commit_cb_served++; |
| } |
| |
| |
| /* Enqueue reply to requester's queue, if any. */ |
| if (rko_orig->rko_replyq.q) { |
| rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err); |
| |
| rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH); |
| |
| /* Copy offset & partitions & callbacks to reply op */ |
| rko_reply->rko_u.offset_commit = rko_orig->rko_u.offset_commit; |
| if (offsets) |
| rko_reply->rko_u.offset_commit.partitions = |
| rd_kafka_topic_partition_list_copy(offsets); |
| if (rko_reply->rko_u.offset_commit.reason) |
| rko_reply->rko_u.offset_commit.reason = |
| rd_strdup(rko_reply->rko_u.offset_commit.reason); |
| |
| rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko_reply, 0); |
| offset_commit_cb_served++; |
| } |
| |
| errcnt = rd_kafka_cgrp_handle_OffsetCommit(rkcg, err, offsets); |
| |
| if (!offset_commit_cb_served && |
| err != RD_KAFKA_RESP_ERR_NO_ERROR && |
| err != RD_KAFKA_RESP_ERR__NO_OFFSET) { |
| /* If there is no callback or handler for this (auto) |
| * commit then raise an error to the application (#1043) */ |
| char tmp[512]; |
| |
| rd_kafka_topic_partition_list_str( |
| offsets, tmp, sizeof(tmp), |
| /*no partition-errs if a global error*/ |
| RD_KAFKA_FMT_F_OFFSET | |
| (err ? 0 : RD_KAFKA_FMT_F_ONLY_ERR)); |
| |
| rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "COMMITFAIL", |
| "Offset commit (%s) failed " |
| "for %d/%d partition(s): " |
| "%s%s%s", |
| rko_orig->rko_u.offset_commit.reason, |
| err ? offsets->cnt : errcnt, offsets->cnt, |
| err ? rd_kafka_err2str(err) : "", |
| err ? ": " : "", |
| tmp); |
| } |
| |
| rd_kafka_op_destroy(rko_orig); |
| } |
| |
| |
| static size_t rd_kafka_topic_partition_has_absolute_offset ( |
| const rd_kafka_topic_partition_t *rktpar, void *opaque) { |
| return rktpar->offset >= 0 ? 1 : 0; |
| } |
| |
| |
| /** |
| * Commit a list of offsets. |
| * Reuse the orignating 'rko' for the async reply. |
| * 'rko->rko_payload' should either by NULL (to commit current assignment) or |
| * a proper topic_partition_list_t with offsets to commit. |
| * The offset list will be altered. |
| * |
| * \p rko...silent_empty: if there are no offsets to commit bail out |
| * silently without posting an op on the reply queue. |
| * \p set_offsets: set offsets in rko->rko_u.offset_commit.partitions |
| * |
| * \p op_version: cgrp's op version to use (or 0) |
| * |
| * Locality: cgrp thread |
| */ |
| static void rd_kafka_cgrp_offsets_commit (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_op_t *rko, |
| int set_offsets, |
| const char *reason, |
| int op_version) { |
| rd_kafka_topic_partition_list_t *offsets; |
| rd_kafka_resp_err_t err; |
| int valid_offsets = 0; |
| |
| /* If offsets is NULL we shall use the current assignment. */ |
| if (!rko->rko_u.offset_commit.partitions && rkcg->rkcg_assignment) |
| rko->rko_u.offset_commit.partitions = |
| rd_kafka_topic_partition_list_copy( |
| rkcg->rkcg_assignment); |
| |
| offsets = rko->rko_u.offset_commit.partitions; |
| |
| if (offsets) { |
| /* Set offsets to commits */ |
| if (set_offsets) |
| rd_kafka_topic_partition_list_set_offsets( |
| rkcg->rkcg_rk, rko->rko_u.offset_commit.partitions, 1, |
| RD_KAFKA_OFFSET_INVALID/* def */, |
| 1 /* is commit */); |
| |
| /* Check the number of valid offsets to commit. */ |
| valid_offsets = (int)rd_kafka_topic_partition_list_sum( |
| offsets, |
| rd_kafka_topic_partition_has_absolute_offset, NULL); |
| } |
| |
| if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) { |
| /* wait_commit_cnt has already been increased for |
| * reprocessed ops. */ |
| rkcg->rkcg_wait_commit_cnt++; |
| } |
| |
| if (!valid_offsets) { |
| /* No valid offsets */ |
| err = RD_KAFKA_RESP_ERR__NO_OFFSET; |
| goto err; |
| } |
| |
| if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkcg->rkcg_rkb || |
| rkcg->rkcg_rkb->rkb_source == RD_KAFKA_INTERNAL) { |
| |
| if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko, reason)) |
| return; |
| |
| err = RD_KAFKA_RESP_ERR__WAIT_COORD; |
| |
| } else { |
| int r; |
| |
| /* Send OffsetCommit */ |
| r = rd_kafka_OffsetCommitRequest( |
| rkcg->rkcg_rkb, rkcg, 1, offsets, |
| RD_KAFKA_REPLYQ(rkcg->rkcg_ops, op_version), |
| rd_kafka_cgrp_op_handle_OffsetCommit, rko, |
| reason); |
| |
| /* Must have valid offsets to commit if we get here */ |
| rd_kafka_assert(NULL, r != 0); |
| |
| return; |
| } |
| |
| |
| |
| err: |
| /* Propagate error to whoever wanted offset committed. */ |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT", |
| "OffsetCommit internal error: %s", rd_kafka_err2str(err)); |
| rd_kafka_cgrp_op_handle_OffsetCommit(rkcg->rkcg_rk, NULL, err, |
| NULL, NULL, rko); |
| } |
| |
| |
| /** |
| * Commit offsets for all assigned partitions. |
| */ |
| static void |
| rd_kafka_cgrp_assigned_offsets_commit (rd_kafka_cgrp_t *rkcg, |
| const rd_kafka_topic_partition_list_t |
| *offsets, const char *reason) { |
| rd_kafka_op_t *rko; |
| |
| rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT); |
| rko->rko_u.offset_commit.reason = rd_strdup(reason); |
| if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_OFFSET_COMMIT) { |
| rd_kafka_op_set_replyq(rko, rkcg->rkcg_rk->rk_rep, 0); |
| rko->rko_u.offset_commit.cb = |
| rkcg->rkcg_rk->rk_conf.offset_commit_cb; /*maybe NULL*/ |
| rko->rko_u.offset_commit.opaque = rkcg->rkcg_rk->rk_conf.opaque; |
| } |
| /* NULL partitions means current assignment */ |
| if (offsets) |
| rko->rko_u.offset_commit.partitions = |
| rd_kafka_topic_partition_list_copy(offsets); |
| rko->rko_u.offset_commit.silent_empty = 1; |
| rd_kafka_cgrp_offsets_commit(rkcg, rko, 1/* set offsets */, reason, |
| rkcg->rkcg_version); |
| } |
| |
| |
| /** |
| * auto.commit.interval.ms commit timer callback. |
| * |
| * Trigger a group offset commit. |
| * |
| * Locality: rdkafka main thread |
| */ |
| static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts, |
| void *arg) { |
| rd_kafka_cgrp_t *rkcg = arg; |
| |
| rd_kafka_cgrp_assigned_offsets_commit(rkcg, NULL, |
| "cgrp auto commit timer"); |
| } |
| |
| |
| |
| |
| /** |
| * Call when all unassign operations are done to transition to the next state |
| */ |
| static void rd_kafka_cgrp_unassign_done (rd_kafka_cgrp_t *rkcg, |
| const char *reason) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN", |
| "Group \"%s\": unassign done in state %s (join state %s): " |
| "%s: %s", |
| rkcg->rkcg_group_id->str, |
| rd_kafka_cgrp_state_names[rkcg->rkcg_state], |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], |
| rkcg->rkcg_assignment ? |
| "with new assignment" : "without new assignment", |
| reason); |
| |
| if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN) { |
| rd_kafka_cgrp_leave(rkcg, 1/*ignore response*/); |
| rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN; |
| } |
| |
| if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN) { |
| rd_kafka_cgrp_try_terminate(rkcg); |
| return; |
| } |
| |
| if (rkcg->rkcg_assignment) { |
| rd_kafka_cgrp_set_join_state(rkcg, |
| RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED); |
| if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg)) |
| rd_kafka_cgrp_partitions_fetch_start( |
| rkcg, rkcg->rkcg_assignment, 0); |
| } else { |
| rd_kafka_cgrp_set_join_state(rkcg, |
| RD_KAFKA_CGRP_JOIN_STATE_INIT); |
| } |
| |
| rd_kafka_cgrp_try_terminate(rkcg); |
| } |
| |
| |
| /** |
| * Checks if the current unassignment is done and if so |
| * calls .._done(). |
| * Else does nothing. |
| */ |
| static void rd_kafka_cgrp_check_unassign_done (rd_kafka_cgrp_t *rkcg, |
| const char *reason) { |
| if (rkcg->rkcg_wait_unassign_cnt > 0 || |
| rkcg->rkcg_assigned_cnt > 0 || |
| rkcg->rkcg_wait_commit_cnt > 0 || |
| rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN", |
| "Unassign not done yet " |
| "(%d wait_unassign, %d assigned, %d wait commit" |
| "%s): %s", |
| rkcg->rkcg_wait_unassign_cnt, |
| rkcg->rkcg_assigned_cnt, |
| rkcg->rkcg_wait_commit_cnt, |
| (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)? |
| ", F_WAIT_UNASSIGN" : "", reason); |
| return; |
| } |
| |
| rd_kafka_cgrp_unassign_done(rkcg, reason); |
| } |
| |
| |
| |
| /** |
| * Remove existing assignment. |
| */ |
| static rd_kafka_resp_err_t |
| rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg) { |
| int i; |
| rd_kafka_topic_partition_list_t *old_assignment; |
| |
| rd_kafka_cgrp_set_join_state(rkcg, |
| RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN); |
| |
| rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_UNASSIGN; |
| old_assignment = rkcg->rkcg_assignment; |
| if (!old_assignment) { |
| rd_kafka_cgrp_check_unassign_done( |
| rkcg, "unassign (no previous assignment)"); |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| rkcg->rkcg_assignment = NULL; |
| |
| rd_kafka_cgrp_version_new_barrier(rkcg); |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN", |
| "Group \"%s\": unassigning %d partition(s) (v%"PRId32")", |
| rkcg->rkcg_group_id->str, old_assignment->cnt, |
| rkcg->rkcg_version); |
| |
| if (rkcg->rkcg_rk->rk_conf.offset_store_method == |
| RD_KAFKA_OFFSET_METHOD_BROKER && |
| rkcg->rkcg_rk->rk_conf.enable_auto_commit) { |
| /* Commit all offsets for all assigned partitions to broker */ |
| rd_kafka_cgrp_assigned_offsets_commit(rkcg, old_assignment, |
| "unassign"); |
| } |
| |
| for (i = 0 ; i < old_assignment->cnt ; i++) { |
| rd_kafka_topic_partition_t *rktpar; |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_kafka_toppar_t *rktp; |
| |
| rktpar = &old_assignment->elems[i]; |
| s_rktp = rktpar->_private; |
| rktp = rd_kafka_toppar_s2i(s_rktp); |
| |
| if (rktp->rktp_assigned) { |
| rd_kafka_toppar_op_fetch_stop( |
| rktp, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0)); |
| rkcg->rkcg_wait_unassign_cnt++; |
| } |
| |
| rd_kafka_toppar_lock(rktp); |
| rd_kafka_toppar_desired_del(rktp); |
| rd_kafka_toppar_unlock(rktp); |
| } |
| |
| /* Resume partition consumption. */ |
| rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 0/*resume*/, |
| RD_KAFKA_TOPPAR_F_LIB_PAUSE, |
| old_assignment); |
| |
| rd_kafka_topic_partition_list_destroy(old_assignment); |
| |
| rd_kafka_cgrp_check_unassign_done(rkcg, "unassign"); |
| |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| |
| /** |
| * Set new atomic partition assignment |
| * May update \p assignment but will not hold on to it. |
| */ |
| static void |
| rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_topic_partition_list_t *assignment) { |
| int i; |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN", |
| "Group \"%s\": new assignment of %d partition(s) " |
| "in join state %s", |
| rkcg->rkcg_group_id->str, |
| assignment ? assignment->cnt : 0, |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); |
| |
| /* Get toppar object for each partition. |
| * This is to make sure the rktp stays alive during unassign(). */ |
| for (i = 0 ; assignment && i < assignment->cnt ; i++) { |
| rd_kafka_topic_partition_t *rktpar; |
| shptr_rd_kafka_toppar_t *s_rktp; |
| |
| rktpar = &assignment->elems[i]; |
| |
| /* Use existing toppar if set */ |
| if (rktpar->_private) |
| continue; |
| |
| s_rktp = rd_kafka_toppar_get2(rkcg->rkcg_rk, |
| rktpar->topic, |
| rktpar->partition, |
| 0/*no-ua*/, 1/*create-on-miss*/); |
| if (s_rktp) |
| rktpar->_private = s_rktp; |
| } |
| |
| rd_kafka_cgrp_version_new_barrier(rkcg); |
| |
| rd_kafka_wrlock(rkcg->rkcg_rk); |
| rkcg->rkcg_c.assignment_size = assignment ? assignment->cnt : 0; |
| rd_kafka_wrunlock(rkcg->rkcg_rk); |
| |
| |
| /* Remove existing assignment (async operation) */ |
| if (rkcg->rkcg_assignment) |
| rd_kafka_cgrp_unassign(rkcg); |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN", |
| "Group \"%s\": assigning %d partition(s) in join state %s", |
| rkcg->rkcg_group_id->str, assignment ? assignment->cnt : 0, |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); |
| |
| |
| if (assignment) { |
| rkcg->rkcg_assignment = |
| rd_kafka_topic_partition_list_copy(assignment); |
| |
| /* Mark partition(s) as desired */ |
| for (i = 0 ; i < rkcg->rkcg_assignment->cnt ; i++) { |
| rd_kafka_topic_partition_t *rktpar = |
| &rkcg->rkcg_assignment->elems[i]; |
| shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private; |
| rd_kafka_toppar_t *rktp = |
| rd_kafka_toppar_s2i(s_rktp); |
| rd_kafka_toppar_lock(rktp); |
| rd_kafka_toppar_desired_add0(rktp); |
| rd_kafka_toppar_unlock(rktp); |
| } |
| } |
| |
| if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN) |
| return; |
| |
| rd_dassert(rkcg->rkcg_wait_unassign_cnt == 0); |
| |
| rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED); |
| |
| if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg) && rkcg->rkcg_assignment) { |
| /* No existing assignment that needs to be decommissioned, |
| * start partition fetchers right away */ |
| rd_kafka_cgrp_partitions_fetch_start( |
| rkcg, rkcg->rkcg_assignment, 0); |
| } |
| } |
| |
| |
| |
| |
| /** |
| * Handle a rebalance-triggered partition assignment. |
| * |
| * If a rebalance_cb has been registered we enqueue an op for the app |
| * and let the app perform the actual assign() call. |
| * Otherwise we assign() directly from here. |
| * |
| * This provides the most flexibility, allowing the app to perform any |
| * operation it seem fit (e.g., offset writes or reads) before actually |
| * updating the assign():ment. |
| */ |
| static void |
| rd_kafka_cgrp_handle_assignment (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_topic_partition_list_t *assignment) { |
| |
| rd_kafka_rebalance_op(rkcg, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, |
| assignment, "new assignment"); |
| } |
| |
| |
| /** |
| * Handle HeartbeatResponse errors. |
| * |
| * If an IllegalGeneration error code is returned in the |
| * HeartbeatResponse, it indicates that the co-ordinator has |
| * initiated a rebalance. The consumer then stops fetching data, |
| * commits offsets and sends a JoinGroupRequest to it's co-ordinator |
| * broker */ |
| void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_resp_err_t err) { |
| |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", |
| "Group \"%s\" heartbeat error response in " |
| "state %s (join state %s, %d partition(s) assigned): %s", |
| rkcg->rkcg_group_id->str, |
| rd_kafka_cgrp_state_names[rkcg->rkcg_state], |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], |
| rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0, |
| rd_kafka_err2str(err)); |
| |
| if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", |
| "Heartbeat response: discarding outdated " |
| "request (now in join-state %s)", |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); |
| return; |
| } |
| |
| switch (err) |
| { |
| case RD_KAFKA_RESP_ERR__DESTROY: |
| /* quick cleanup */ |
| break; |
| case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: |
| case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: |
| case RD_KAFKA_RESP_ERR__TRANSPORT: |
| /* Remain in joined state and keep querying for coordinator */ |
| rd_interval_expedite(&rkcg->rkcg_coord_query_intvl, 0); |
| break; |
| |
| case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: |
| rd_kafka_cgrp_set_member_id(rkcg, ""); |
| case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS: |
| case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: |
| default: |
| /* Just revert to INIT state if join state is active. */ |
| if (rkcg->rkcg_join_state < |
| RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB || |
| rkcg->rkcg_join_state == |
| RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB) |
| break; |
| |
| rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT); |
| |
| if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) { |
| rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_UNASSIGN; |
| |
| /* Trigger rebalance_cb */ |
| rd_kafka_rebalance_op( |
| rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, |
| rkcg->rkcg_assignment, rd_kafka_err2str(err)); |
| } |
| break; |
| } |
| } |
| |
| |
| |
| /** |
| * Clean up any group-leader related resources. |
| * |
| * Locality: cgrp thread |
| */ |
| static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg, |
| const char *reason) { |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPLEADER", |
| "Group \"%.*s\": resetting group leader info: %s", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason); |
| if (rkcg->rkcg_group_leader.protocol) { |
| rd_free(rkcg->rkcg_group_leader.protocol); |
| rkcg->rkcg_group_leader.protocol = NULL; |
| } |
| |
| if (rkcg->rkcg_group_leader.members) { |
| int i; |
| |
| for (i = 0 ; i < rkcg->rkcg_group_leader.member_cnt ; i++) |
| rd_kafka_group_member_clear(&rkcg->rkcg_group_leader. |
| members[i]); |
| rkcg->rkcg_group_leader.member_cnt = 0; |
| rd_free(rkcg->rkcg_group_leader.members); |
| rkcg->rkcg_group_leader.members = NULL; |
| } |
| } |
| |
| |
| |
| |
| /** |
| * Remove existing topic subscription. |
| */ |
| static rd_kafka_resp_err_t |
| rd_kafka_cgrp_unsubscribe (rd_kafka_cgrp_t *rkcg, int leave_group) { |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE", |
| "Group \"%.*s\": unsubscribe from current %ssubscription " |
| "of %d topics (leave group=%s, join state %s, v%"PRId32")", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rkcg->rkcg_subscription ? "" : "unset ", |
| rkcg->rkcg_subscription ? rkcg->rkcg_subscription->cnt : 0, |
| leave_group ? "yes":"no", |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], |
| rkcg->rkcg_version); |
| |
| if (rkcg->rkcg_subscription) { |
| rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription); |
| rkcg->rkcg_subscription = NULL; |
| } |
| |
| rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL); |
| |
| /* |
| * Clean-up group leader duties, if any. |
| */ |
| rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe"); |
| |
| if (leave_group) |
| rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN; |
| |
| |
| |
| /* Remove assignment (async), if any. If there is already an |
| * unassign in progress we dont need to bother. */ |
| if (!RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) && |
| !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) { |
| rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_UNASSIGN; |
| |
| rd_kafka_rebalance_op(rkcg, |
| RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, |
| rkcg->rkcg_assignment, "unsubscribe"); |
| } |
| |
| rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION | |
| RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION); |
| |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| |
| /** |
| * Set new atomic topic subscription. |
| */ |
| static rd_kafka_resp_err_t |
| rd_kafka_cgrp_subscribe (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_topic_partition_list_t *rktparlist) { |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE", |
| "Group \"%.*s\": subscribe to new %ssubscription " |
| "of %d topics (join state %s)", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rktparlist ? "" : "unset ", |
| rktparlist ? rktparlist->cnt : 0, |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); |
| |
| if (rkcg->rkcg_rk->rk_conf.enabled_assignor_cnt == 0) |
| return RD_KAFKA_RESP_ERR__INVALID_ARG; |
| |
| /* Remove existing subscription first */ |
| rd_kafka_cgrp_unsubscribe(rkcg, 0/* dont leave group */); |
| |
| if (!rktparlist) |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| |
| rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION; |
| |
| if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0) |
| rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION; |
| |
| rkcg->rkcg_subscription = rktparlist; |
| |
| rd_kafka_cgrp_join(rkcg); |
| |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| |
| |
| |
| |
| |
| /** |
| * Same as cgrp_terminate() but called from the cgrp/main thread upon receiving |
| * the op 'rko' from cgrp_terminate(). |
| * |
| * NOTE: Takes ownership of 'rko' |
| * |
| * Locality: main thread |
| */ |
| void |
| rd_kafka_cgrp_terminate0 (rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko) { |
| |
| rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread)); |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM", |
| "Terminating group \"%.*s\" in state %s " |
| "with %d partition(s)", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_kafka_cgrp_state_names[rkcg->rkcg_state], |
| rd_list_cnt(&rkcg->rkcg_toppars)); |
| |
| if (unlikely(rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM || |
| (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) || |
| rkcg->rkcg_reply_rko != NULL)) { |
| /* Already terminating or handling a previous terminate */ |
| if (rko) { |
| rd_kafka_q_t *rkq = rko->rko_replyq.q; |
| rko->rko_replyq.q = NULL; |
| rd_kafka_q_op_err(rkq, RD_KAFKA_OP_CONSUMER_ERR, |
| RD_KAFKA_RESP_ERR__IN_PROGRESS, |
| rko->rko_replyq.version, |
| NULL, 0, |
| "Group is %s", |
| rkcg->rkcg_reply_rko ? |
| "terminating":"terminated"); |
| rd_kafka_q_destroy(rkq); |
| rd_kafka_op_destroy(rko); |
| } |
| return; |
| } |
| |
| /* Mark for stopping, the actual state transition |
| * is performed when all toppars have left. */ |
| rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_TERMINATE; |
| rkcg->rkcg_ts_terminate = rd_clock(); |
| rkcg->rkcg_reply_rko = rko; |
| |
| if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) |
| rd_kafka_cgrp_unsubscribe(rkcg, 1/*leave group*/); |
| |
| /* If there's an oustanding rebalance_cb which has not yet been |
| * served by the application it will be served from consumer_close(). */ |
| if (!RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) && |
| !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) |
| rd_kafka_cgrp_unassign(rkcg); |
| |
| /* Try to terminate right away if all preconditions are met. */ |
| rd_kafka_cgrp_try_terminate(rkcg); |
| } |
| |
| |
| /** |
| * Terminate and decommission a cgrp asynchronously. |
| * |
| * Locality: any thread |
| */ |
| void rd_kafka_cgrp_terminate (rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq) { |
| rd_kafka_assert(NULL, !thrd_is_current(rkcg->rkcg_rk->rk_thread)); |
| rd_kafka_cgrp_op(rkcg, NULL, replyq, RD_KAFKA_OP_TERMINATE, 0); |
| } |
| |
| |
| struct _op_timeout_offset_commit { |
| rd_ts_t now; |
| rd_kafka_t *rk; |
| rd_list_t expired; |
| }; |
| |
| /** |
| * q_filter callback for expiring OFFSET_COMMIT timeouts. |
| */ |
| static int rd_kafka_op_offset_commit_timeout_check (rd_kafka_q_t *rkq, |
| rd_kafka_op_t *rko, |
| void *opaque) { |
| struct _op_timeout_offset_commit *state = |
| (struct _op_timeout_offset_commit*)opaque; |
| |
| if (likely(rko->rko_type != RD_KAFKA_OP_OFFSET_COMMIT || |
| rko->rko_u.offset_commit.ts_timeout == 0 || |
| rko->rko_u.offset_commit.ts_timeout > state->now)) { |
| return 0; |
| } |
| |
| rd_kafka_q_deq0(rkq, rko); |
| |
| /* Add to temporary list to avoid recursive |
| * locking of rkcg_wait_coord_q. */ |
| rd_list_add(&state->expired, rko); |
| return 1; |
| } |
| |
| |
| /** |
| * Scan for various timeouts. |
| */ |
| static void rd_kafka_cgrp_timeout_scan (rd_kafka_cgrp_t *rkcg, rd_ts_t now) { |
| struct _op_timeout_offset_commit ofc_state; |
| int i, cnt = 0; |
| rd_kafka_op_t *rko; |
| |
| ofc_state.now = now; |
| ofc_state.rk = rkcg->rkcg_rk; |
| rd_list_init(&ofc_state.expired, 0, NULL); |
| |
| cnt += rd_kafka_q_apply(rkcg->rkcg_wait_coord_q, |
| rd_kafka_op_offset_commit_timeout_check, |
| &ofc_state); |
| |
| RD_LIST_FOREACH(rko, &ofc_state.expired, i) |
| rd_kafka_cgrp_op_handle_OffsetCommit( |
| rkcg->rkcg_rk, NULL, |
| RD_KAFKA_RESP_ERR__WAIT_COORD, |
| NULL, NULL, rko); |
| |
| rd_list_destroy(&ofc_state.expired); |
| |
| if (cnt > 0) |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTIMEOUT", |
| "Group \"%.*s\": timed out %d op(s), %d remain", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), cnt, |
| rd_kafka_q_len(rkcg->rkcg_wait_coord_q)); |
| |
| |
| } |
| |
| |
| /** |
| * @brief Handle cgrp queue op. |
| * @locality rdkafka main thread |
| * @locks none |
| */ |
| static rd_kafka_op_res_t |
| rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq, |
| rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type, |
| void *opaque) { |
| rd_kafka_cgrp_t *rkcg = opaque; |
| rd_kafka_broker_t *rkb = rkcg->rkcg_rkb; |
| rd_kafka_toppar_t *rktp; |
| rd_kafka_resp_err_t err; |
| const int silent_op = rko->rko_type == RD_KAFKA_OP_RECV_BUF; |
| |
| if (rko->rko_version && rkcg->rkcg_version > rko->rko_version) { |
| rd_kafka_op_destroy(rko); /* outdated */ |
| return RD_KAFKA_OP_RES_HANDLED; |
| } |
| |
| rktp = rko->rko_rktp ? rd_kafka_toppar_s2i(rko->rko_rktp) : NULL; |
| |
| if (rktp && !silent_op) |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPOP", |
| "Group \"%.*s\" received op %s in state %s " |
| "(join state %s, v%"PRId32") " |
| "for %.*s [%"PRId32"]", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_kafka_op2str(rko->rko_type), |
| rd_kafka_cgrp_state_names[rkcg->rkcg_state], |
| rd_kafka_cgrp_join_state_names[rkcg-> |
| rkcg_join_state], |
| rkcg->rkcg_version, |
| RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
| rktp->rktp_partition); |
| else if (!silent_op) |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPOP", |
| "Group \"%.*s\" received op %s (v%d) in state %s " |
| "(join state %s, v%"PRId32" vs %"PRId32")", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_kafka_op2str(rko->rko_type), |
| rko->rko_version, |
| rd_kafka_cgrp_state_names[rkcg->rkcg_state], |
| rd_kafka_cgrp_join_state_names[rkcg-> |
| rkcg_join_state], |
| rkcg->rkcg_version, rko->rko_version); |
| |
| switch ((int)rko->rko_type) |
| { |
| case RD_KAFKA_OP_NAME: |
| /* Return the currently assigned member id. */ |
| if (rkcg->rkcg_member_id) |
| rko->rko_u.name.str = |
| RD_KAFKAP_STR_DUP(rkcg->rkcg_member_id); |
| rd_kafka_op_reply(rko, 0); |
| rko = NULL; |
| break; |
| |
| case RD_KAFKA_OP_OFFSET_FETCH: |
| if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || |
| (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)) { |
| rd_kafka_op_handle_OffsetFetch( |
| rkcg->rkcg_rk, rkb, |
| RD_KAFKA_RESP_ERR__WAIT_COORD, |
| NULL, NULL, rko); |
| rko = NULL; /* rko freed by handler */ |
| break; |
| } |
| |
| rd_kafka_OffsetFetchRequest( |
| rkb, 1, |
| rko->rko_u.offset_fetch.partitions, |
| RD_KAFKA_REPLYQ(rkcg->rkcg_ops, |
| rkcg->rkcg_version), |
| rd_kafka_op_handle_OffsetFetch, rko); |
| rko = NULL; /* rko now owned by request */ |
| break; |
| |
| case RD_KAFKA_OP_PARTITION_JOIN: |
| rd_kafka_cgrp_partition_add(rkcg, rktp); |
| |
| /* If terminating tell the partition to leave */ |
| if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) |
| rd_kafka_toppar_op_fetch_stop( |
| rktp, RD_KAFKA_NO_REPLYQ); |
| break; |
| |
| case RD_KAFKA_OP_PARTITION_LEAVE: |
| rd_kafka_cgrp_partition_del(rkcg, rktp); |
| break; |
| |
| case RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY: |
| /* Reply from toppar FETCH_STOP */ |
| rd_kafka_assert(rkcg->rkcg_rk, |
| rkcg->rkcg_wait_unassign_cnt > 0); |
| rkcg->rkcg_wait_unassign_cnt--; |
| |
| rd_kafka_assert(rkcg->rkcg_rk, rktp->rktp_assigned); |
| rd_kafka_assert(rkcg->rkcg_rk, |
| rkcg->rkcg_assigned_cnt > 0); |
| rktp->rktp_assigned = 0; |
| rkcg->rkcg_assigned_cnt--; |
| |
| /* All unassigned toppars now stopped and commit done: |
| * transition to the next state. */ |
| if (rkcg->rkcg_join_state == |
| RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN) |
| rd_kafka_cgrp_check_unassign_done(rkcg, |
| "FETCH_STOP done"); |
| break; |
| |
| case RD_KAFKA_OP_OFFSET_COMMIT: |
| /* Trigger offsets commit. */ |
| rd_kafka_cgrp_offsets_commit(rkcg, rko, |
| /* only set offsets |
| * if no partitions were |
| * specified. */ |
| rko->rko_u.offset_commit. |
| partitions ? 0 : 1, |
| rko->rko_u.offset_commit.reason, |
| 0); |
| rko = NULL; /* rko now owned by request */ |
| break; |
| |
| case RD_KAFKA_OP_COORD_QUERY: |
| rd_kafka_cgrp_coord_query(rkcg, |
| rko->rko_err ? |
| rd_kafka_err2str(rko-> |
| rko_err): |
| "from op"); |
| break; |
| |
| case RD_KAFKA_OP_SUBSCRIBE: |
| /* New atomic subscription (may be NULL) */ |
| err = rd_kafka_cgrp_subscribe( |
| rkcg, rko->rko_u.subscribe.topics); |
| if (!err) |
| rko->rko_u.subscribe.topics = NULL; /* owned by rkcg */ |
| rd_kafka_op_reply(rko, err); |
| rko = NULL; |
| break; |
| |
| case RD_KAFKA_OP_ASSIGN: |
| /* New atomic assignment (payload != NULL), |
| * or unassignment (payload == NULL) */ |
| err = 0; |
| if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) { |
| /* Treat all assignments as unassign |
| * when terminating. */ |
| rd_kafka_cgrp_unassign(rkcg); |
| if (rko->rko_u.assign.partitions) |
| err = RD_KAFKA_RESP_ERR__DESTROY; |
| } else { |
| rd_kafka_cgrp_assign( |
| rkcg, rko->rko_u.assign.partitions); |
| } |
| rd_kafka_op_reply(rko, err); |
| rko = NULL; |
| break; |
| |
| case RD_KAFKA_OP_GET_SUBSCRIPTION: |
| if (rkcg->rkcg_subscription) |
| rko->rko_u.subscribe.topics = |
| rd_kafka_topic_partition_list_copy( |
| rkcg->rkcg_subscription); |
| rd_kafka_op_reply(rko, 0); |
| rko = NULL; |
| break; |
| |
| case RD_KAFKA_OP_GET_ASSIGNMENT: |
| if (rkcg->rkcg_assignment) |
| rko->rko_u.assign.partitions = |
| rd_kafka_topic_partition_list_copy( |
| rkcg->rkcg_assignment); |
| |
| rd_kafka_op_reply(rko, 0); |
| rko = NULL; |
| break; |
| |
| case RD_KAFKA_OP_TERMINATE: |
| rd_kafka_cgrp_terminate0(rkcg, rko); |
| rko = NULL; /* terminate0() takes ownership */ |
| break; |
| |
| default: |
| rd_kafka_assert(rkcg->rkcg_rk, !*"unknown type"); |
| break; |
| } |
| |
| if (rko) |
| rd_kafka_op_destroy(rko); |
| |
| return RD_KAFKA_OP_RES_HANDLED; |
| } |
| |
| |
| /** |
| * Client group's join state handling |
| */ |
| static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_broker_t *rkb) { |
| |
| if (0) // FIXME |
| rd_rkb_dbg(rkb, CGRP, "JOINFSM", |
| "Group \"%s\" in join state %s with%s subscription", |
| rkcg->rkcg_group_id->str, |
| rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], |
| rkcg->rkcg_subscription ? "" : "out"); |
| |
| switch (rkcg->rkcg_join_state) |
| { |
| case RD_KAFKA_CGRP_JOIN_STATE_INIT: |
| /* If we have a subscription start the join process. */ |
| if (!rkcg->rkcg_subscription) |
| break; |
| |
| if (rd_interval_immediate(&rkcg->rkcg_join_intvl, |
| 1000*1000, 0) > 0) |
| rd_kafka_cgrp_join(rkcg); |
| break; |
| |
| case RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN: |
| case RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA: |
| case RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC: |
| case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN: |
| case RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB: |
| break; |
| |
| case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB: |
| case RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED: |
| case RD_KAFKA_CGRP_JOIN_STATE_STARTED: |
| if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION && |
| rd_interval(&rkcg->rkcg_heartbeat_intvl, |
| rkcg->rkcg_rk->rk_conf. |
| group_heartbeat_intvl_ms * 1000, 0) > 0) |
| rd_kafka_cgrp_heartbeat(rkcg, rkb); |
| break; |
| } |
| |
| } |
| /** |
| * Client group handling. |
| * Called from main thread to serve the operational aspects of a cgrp. |
| */ |
| void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg) { |
| rd_kafka_broker_t *rkb = rkcg->rkcg_rkb; |
| int rkb_state = RD_KAFKA_BROKER_STATE_INIT; |
| rd_ts_t now; |
| |
| if (rkb) { |
| rd_kafka_broker_lock(rkb); |
| rkb_state = rkb->rkb_state; |
| rd_kafka_broker_unlock(rkb); |
| |
| /* Go back to querying state if we lost the current coordinator |
| * connection. */ |
| if (rkb_state < RD_KAFKA_BROKER_STATE_UP && |
| rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) |
| rd_kafka_cgrp_set_state(rkcg, |
| RD_KAFKA_CGRP_STATE_QUERY_COORD); |
| } |
| |
| now = rd_clock(); |
| |
| /* Check for cgrp termination */ |
| if (unlikely(rd_kafka_cgrp_try_terminate(rkcg))) { |
| rd_kafka_cgrp_terminated(rkcg); |
| return; /* cgrp terminated */ |
| } |
| |
| /* Bail out if we're terminating. */ |
| if (unlikely(rd_kafka_terminating(rkcg->rkcg_rk))) |
| return; |
| |
| retry: |
| switch (rkcg->rkcg_state) |
| { |
| case RD_KAFKA_CGRP_STATE_TERM: |
| break; |
| |
| case RD_KAFKA_CGRP_STATE_INIT: |
| rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); |
| /* FALLTHRU */ |
| |
| case RD_KAFKA_CGRP_STATE_QUERY_COORD: |
| /* Query for coordinator. */ |
| if (rd_interval_immediate(&rkcg->rkcg_coord_query_intvl, |
| 500*1000, now) > 0) |
| rd_kafka_cgrp_coord_query(rkcg, |
| "intervaled in " |
| "state query-coord"); |
| break; |
| |
| case RD_KAFKA_CGRP_STATE_WAIT_COORD: |
| /* Waiting for GroupCoordinator response */ |
| break; |
| |
| case RD_KAFKA_CGRP_STATE_WAIT_BROKER: |
| /* See if the group should be reassigned to another broker. */ |
| if (rd_kafka_cgrp_reassign_broker(rkcg)) |
| goto retry; /* broker reassigned, retry state-machine |
| * to speed up next transition. */ |
| |
| /* Coordinator query */ |
| if (rd_interval(&rkcg->rkcg_coord_query_intvl, |
| 1000*1000, now) > 0) |
| rd_kafka_cgrp_coord_query(rkcg, |
| "intervaled in " |
| "state wait-broker"); |
| break; |
| |
| case RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT: |
| /* Waiting for broker transport to come up. |
| * Also make sure broker supports groups. */ |
| if (rkb_state < RD_KAFKA_BROKER_STATE_UP || !rkb || |
| !rd_kafka_broker_supports( |
| rkb, RD_KAFKA_FEATURE_BROKER_GROUP_COORD)) { |
| /* Coordinator query */ |
| if (rd_interval(&rkcg->rkcg_coord_query_intvl, |
| 1000*1000, now) > 0) |
| rd_kafka_cgrp_coord_query( |
| rkcg, |
| "intervaled in state " |
| "wait-broker-transport"); |
| |
| } else { |
| rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_UP); |
| |
| /* Serve join state to trigger (re)join */ |
| rd_kafka_cgrp_join_state_serve(rkcg, rkb); |
| |
| /* Start fetching if we have an assignment. */ |
| if (rkcg->rkcg_assignment && |
| RD_KAFKA_CGRP_CAN_FETCH_START(rkcg)) |
| rd_kafka_cgrp_partitions_fetch_start( |
| rkcg, rkcg->rkcg_assignment, 0); |
| } |
| break; |
| |
| case RD_KAFKA_CGRP_STATE_UP: |
| /* Move any ops awaiting the coordinator to the ops queue |
| * for reprocessing. */ |
| rd_kafka_q_concat(rkcg->rkcg_ops, rkcg->rkcg_wait_coord_q); |
| |
| /* Relaxed coordinator queries. */ |
| if (rd_interval(&rkcg->rkcg_coord_query_intvl, |
| rkcg->rkcg_rk->rk_conf. |
| coord_query_intvl_ms * 1000, now) > 0) |
| rd_kafka_cgrp_coord_query(rkcg, |
| "intervaled in state up"); |
| |
| rd_kafka_cgrp_join_state_serve(rkcg, rkb); |
| break; |
| |
| } |
| |
| if (unlikely(rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP && |
| rd_interval(&rkcg->rkcg_timeout_scan_intvl, |
| 1000*1000, now) > 0)) |
| rd_kafka_cgrp_timeout_scan(rkcg, now); |
| } |
| |
| |
| |
| |
| |
| /** |
| * Send an op to a cgrp. |
| * |
| * Locality: any thread |
| */ |
| void rd_kafka_cgrp_op (rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp, |
| rd_kafka_replyq_t replyq, rd_kafka_op_type_t type, |
| rd_kafka_resp_err_t err) { |
| rd_kafka_op_t *rko; |
| |
| rko = rd_kafka_op_new(type); |
| rko->rko_err = err; |
| rko->rko_replyq = replyq; |
| |
| if (rktp) |
| rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
| |
| rd_kafka_q_enq(rkcg->rkcg_ops, rko); |
| } |
| |
| |
| |
| |
| |
| |
| |
| void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id){ |
| if (rkcg->rkcg_member_id && member_id && |
| !rd_kafkap_str_cmp_str(rkcg->rkcg_member_id, member_id)) |
| return; /* No change */ |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "MEMBERID", |
| "Group \"%.*s\": updating member id \"%s\" -> \"%s\"", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rkcg->rkcg_member_id ? |
| rkcg->rkcg_member_id->str : "(not-set)", |
| member_id ? member_id : "(not-set)"); |
| |
| if (rkcg->rkcg_member_id) { |
| rd_kafkap_str_destroy(rkcg->rkcg_member_id); |
| rkcg->rkcg_member_id = NULL; |
| } |
| |
| if (member_id) |
| rkcg->rkcg_member_id = rd_kafkap_str_new(member_id, -1); |
| } |
| |
| |
| |
| |
| /** |
| * @brief Check if the latest metadata affects the current subscription: |
| * - matched topic added |
| * - matched topic removed |
| * - matched topic's partition count change |
| * |
| * @locks none |
| * @locality rdkafka main thread |
| */ |
| void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join) { |
| rd_list_t *tinfos; |
| |
| rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread)); |
| |
| if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0) |
| return; |
| |
| /* |
| * Create a list of the topics in metadata that matches our subscription |
| */ |
| tinfos = rd_list_new(rkcg->rkcg_subscription->cnt, |
| (void *)rd_kafka_topic_info_destroy); |
| |
| if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) |
| rd_kafka_metadata_topic_match(rkcg->rkcg_rk, |
| tinfos, rkcg->rkcg_subscription); |
| else |
| rd_kafka_metadata_topic_filter(rkcg->rkcg_rk, |
| tinfos, |
| rkcg->rkcg_subscription); |
| |
| |
| /* |
| * Update (takes ownership of \c tinfos) |
| */ |
| if (rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos) && do_join) { |
| /* List of subscribed topics changed, trigger rejoin. */ |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA, "REJOIN", |
| "Group \"%.*s\": " |
| "subscription updated from metadata change: " |
| "rejoining group", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); |
| rd_kafka_cgrp_rejoin(rkcg); |
| } |
| } |
| |
| |
| void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| const rd_kafkap_bytes_t *member_state) { |
| rd_kafka_buf_t *rkbuf = NULL; |
| rd_kafka_topic_partition_list_t *assignment; |
| const int log_decode_errors = LOG_ERR; |
| int16_t Version; |
| int32_t TopicCnt; |
| rd_kafkap_bytes_t UserData; |
| |
| /* Dont handle new assignments when terminating */ |
| if (!err && rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) |
| err = RD_KAFKA_RESP_ERR__DESTROY; |
| |
| if (err) |
| goto err; |
| |
| |
| if (RD_KAFKAP_BYTES_LEN(member_state) == 0) { |
| /* Empty assignment. */ |
| assignment = rd_kafka_topic_partition_list_new(0); |
| memset(&UserData, 0, sizeof(UserData)); |
| goto done; |
| } |
| |
| /* Parse assignment from MemberState */ |
| rkbuf = rd_kafka_buf_new_shadow(member_state->data, |
| RD_KAFKAP_BYTES_LEN(member_state), |
| NULL); |
| /* Protocol parser needs a broker handle to log errors on. */ |
| if (rkb) { |
| rkbuf->rkbuf_rkb = rkb; |
| rd_kafka_broker_keep(rkb); |
| } else |
| rkbuf->rkbuf_rkb = rd_kafka_broker_internal(rkcg->rkcg_rk); |
| |
| rd_kafka_buf_read_i16(rkbuf, &Version); |
| rd_kafka_buf_read_i32(rkbuf, &TopicCnt); |
| |
| if (TopicCnt > 10000) { |
| err = RD_KAFKA_RESP_ERR__BAD_MSG; |
| goto err; |
| } |
| |
| assignment = rd_kafka_topic_partition_list_new(TopicCnt); |
| while (TopicCnt-- > 0) { |
| rd_kafkap_str_t Topic; |
| int32_t PartCnt; |
| rd_kafka_buf_read_str(rkbuf, &Topic); |
| rd_kafka_buf_read_i32(rkbuf, &PartCnt); |
| while (PartCnt-- > 0) { |
| int32_t Partition; |
| char *topic_name; |
| RD_KAFKAP_STR_DUPA(&topic_name, &Topic); |
| rd_kafka_buf_read_i32(rkbuf, &Partition); |
| |
| rd_kafka_topic_partition_list_add( |
| assignment, topic_name, Partition); |
| } |
| } |
| |
| rd_kafka_buf_read_bytes(rkbuf, &UserData); |
| |
| done: |
| /* Set the new assignment */ |
| rd_kafka_cgrp_handle_assignment(rkcg, assignment); |
| |
| rd_kafka_topic_partition_list_destroy(assignment); |
| |
| if (rkbuf) |
| rd_kafka_buf_destroy(rkbuf); |
| |
| return; |
| |
| err_parse: |
| err = rkbuf->rkbuf_err; |
| |
| err: |
| if (rkbuf) |
| rd_kafka_buf_destroy(rkbuf); |
| |
| rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPSYNC", |
| "Group \"%s\": synchronization failed: %s: rejoining", |
| rkcg->rkcg_group_id->str, rd_kafka_err2str(err)); |
| rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT); |
| } |