| /* |
| * librdkafka - Apache Kafka C library |
| * |
| * Copyright (c) 2012-2015, Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include <stdarg.h> |
| |
| #include "rdkafka_int.h" |
| #include "rdkafka_request.h" |
| #include "rdkafka_broker.h" |
| #include "rdkafka_offset.h" |
| #include "rdkafka_topic.h" |
| #include "rdkafka_partition.h" |
| #include "rdkafka_metadata.h" |
| #include "rdkafka_msgset.h" |
| |
| #include "rdrand.h" |
| |
| /** |
| * Kafka protocol request and response handling. |
| * All of this code runs in the broker thread and uses op queues for |
| * propagating results back to the various sub-systems operating in |
| * other threads. |
| */ |
| |
| |
| /** |
| * @brief Decide action(s) to take based on the returned error code. |
| * |
| * The optional var-args is a .._ACTION_END terminated list |
| * of action,error tuples which overrides the general behaviour. |
| * It is to be read as: for \p error, return \p action(s). |
| */ |
| int rd_kafka_err_action (rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, ...) { |
| va_list ap; |
| int actions = 0; |
| int exp_act; |
| |
| /* Match explicitly defined error mappings first. */ |
| va_start(ap, request); |
| while ((exp_act = va_arg(ap, int))) { |
| int exp_err = va_arg(ap, int); |
| |
| if (err == exp_err) |
| actions |= exp_act; |
| } |
| va_end(ap); |
| |
| if (err && rkb && request) |
| rd_rkb_dbg(rkb, BROKER, "REQERR", |
| "%sRequest failed: %s: explicit actions 0x%x", |
| rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey), |
| rd_kafka_err2str(err), actions); |
| |
| /* Explicit error match. */ |
| if (actions) |
| return actions; |
| |
| /* Default error matching */ |
| switch (err) |
| { |
| case RD_KAFKA_RESP_ERR_NO_ERROR: |
| break; |
| case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: |
| case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION: |
| case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE: |
| case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE: |
| case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: |
| case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: |
| case RD_KAFKA_RESP_ERR__WAIT_COORD: |
| /* Request metadata information update */ |
| actions |= RD_KAFKA_ERR_ACTION_REFRESH; |
| break; |
| case RD_KAFKA_RESP_ERR__TIMED_OUT: |
| case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: |
| /* Broker-side request handling timeout */ |
| case RD_KAFKA_RESP_ERR__TRANSPORT: |
| /* Broker connection down */ |
| actions |= RD_KAFKA_ERR_ACTION_RETRY; |
| break; |
| case RD_KAFKA_RESP_ERR__DESTROY: |
| case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT: |
| case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE: |
| default: |
| actions |= RD_KAFKA_ERR_ACTION_PERMANENT; |
| break; |
| } |
| |
| return actions; |
| } |
| |
| |
| /** |
| * Send GroupCoordinatorRequest |
| */ |
| void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb, |
| const rd_kafkap_str_t *cgrp, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque) { |
| rd_kafka_buf_t *rkbuf; |
| |
| rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_GroupCoordinator, 1, |
| RD_KAFKAP_STR_SIZE(cgrp)); |
| rd_kafka_buf_write_kstr(rkbuf, cgrp); |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
| } |
| |
| |
| |
| |
| /** |
| * @brief Parses and handles Offset replies. |
| * |
| * Returns the parsed offsets (and errors) in \p offsets |
| * |
| * @returns 0 on success, else an error. |
| */ |
| rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| rd_kafka_topic_partition_list_t |
| *offsets) { |
| |
| const int log_decode_errors = LOG_ERR; |
| int16_t ErrorCode = 0; |
| int32_t TopicArrayCnt; |
| int actions; |
| int16_t api_version; |
| |
| if (err) { |
| ErrorCode = err; |
| goto err; |
| } |
| |
| api_version = request->rkbuf_reqhdr.ApiVersion; |
| |
| /* NOTE: |
| * Broker may return offsets in a different constellation than |
| * in the original request .*/ |
| |
| rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); |
| while (TopicArrayCnt-- > 0) { |
| rd_kafkap_str_t ktopic; |
| int32_t PartArrayCnt; |
| char *topic_name; |
| |
| rd_kafka_buf_read_str(rkbuf, &ktopic); |
| rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); |
| |
| RD_KAFKAP_STR_DUPA(&topic_name, &ktopic); |
| |
| while (PartArrayCnt-- > 0) { |
| int32_t kpartition; |
| int32_t OffsetArrayCnt; |
| int64_t Offset = -1; |
| rd_kafka_topic_partition_t *rktpar; |
| |
| rd_kafka_buf_read_i32(rkbuf, &kpartition); |
| rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
| |
| if (api_version == 1) { |
| int64_t Timestamp; |
| rd_kafka_buf_read_i64(rkbuf, &Timestamp); |
| rd_kafka_buf_read_i64(rkbuf, &Offset); |
| } else if (api_version == 0) { |
| rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt); |
| /* We only request one offset so just grab |
| * the first one. */ |
| while (OffsetArrayCnt-- > 0) |
| rd_kafka_buf_read_i64(rkbuf, &Offset); |
| } else { |
| rd_kafka_assert(NULL, !*"NOTREACHED"); |
| } |
| |
| rktpar = rd_kafka_topic_partition_list_add( |
| offsets, topic_name, kpartition); |
| rktpar->err = ErrorCode; |
| rktpar->offset = Offset; |
| } |
| } |
| |
| goto done; |
| |
| err_parse: |
| ErrorCode = rkbuf->rkbuf_err; |
| err: |
| actions = rd_kafka_err_action( |
| rkb, ErrorCode, rkbuf, request, |
| RD_KAFKA_ERR_ACTION_PERMANENT, |
| RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, |
| |
| RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY, |
| RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, |
| |
| RD_KAFKA_ERR_ACTION_END); |
| |
| if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
| char tmp[256]; |
| /* Re-query for leader */ |
| rd_snprintf(tmp, sizeof(tmp), |
| "OffsetRequest failed: %s", |
| rd_kafka_err2str(ErrorCode)); |
| rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/, |
| tmp); |
| } |
| |
| if (actions & RD_KAFKA_ERR_ACTION_RETRY) { |
| if (rd_kafka_buf_retry(rkb, request)) |
| return RD_KAFKA_RESP_ERR__IN_PROGRESS; |
| /* FALLTHRU */ |
| } |
| |
| done: |
| return ErrorCode; |
| } |
| |
| |
| |
| |
| |
| |
| /** |
| * Send OffsetRequest for toppar 'rktp'. |
| */ |
| void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb, |
| rd_kafka_topic_partition_list_t *partitions, |
| int16_t api_version, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque) { |
| rd_kafka_buf_t *rkbuf; |
| int i; |
| size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0; |
| const char *last_topic = ""; |
| int32_t topic_cnt = 0, part_cnt = 0; |
| |
| rd_kafka_topic_partition_list_sort_by_topic(partitions); |
| |
| rkbuf = rd_kafka_buf_new_request( |
| rkb, RD_KAFKAP_Offset, 1, |
| /* ReplicaId+TopicArrayCnt+Topic */ |
| 4+4+100+ |
| /* PartArrayCnt */ |
| 4 + |
| /* partition_cnt * Partition+Time+MaxNumOffs */ |
| (partitions->cnt * (4+8+4))); |
| |
| /* ReplicaId */ |
| rd_kafka_buf_write_i32(rkbuf, -1); |
| /* TopicArrayCnt */ |
| of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */ |
| |
| for (i = 0 ; i < partitions->cnt ; i++) { |
| const rd_kafka_topic_partition_t *rktpar = &partitions->elems[i]; |
| |
| if (strcmp(rktpar->topic, last_topic)) { |
| /* Finish last topic, if any. */ |
| if (of_PartArrayCnt > 0) |
| rd_kafka_buf_update_i32(rkbuf, |
| of_PartArrayCnt, |
| part_cnt); |
| |
| /* Topic */ |
| rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); |
| topic_cnt++; |
| last_topic = rktpar->topic; |
| /* New topic so reset partition count */ |
| part_cnt = 0; |
| |
| /* PartitionArrayCnt: updated later */ |
| of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
| } |
| |
| /* Partition */ |
| rd_kafka_buf_write_i32(rkbuf, rktpar->partition); |
| part_cnt++; |
| |
| /* Time/Offset */ |
| rd_kafka_buf_write_i64(rkbuf, rktpar->offset); |
| |
| if (api_version == 0) { |
| /* MaxNumberOfOffsets */ |
| rd_kafka_buf_write_i32(rkbuf, 1); |
| } |
| } |
| |
| if (of_PartArrayCnt > 0) { |
| rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt); |
| rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt); |
| } |
| |
| rd_kafka_buf_ApiVersion_set(rkbuf, api_version, |
| api_version == 1 ? |
| RD_KAFKA_FEATURE_OFFSET_TIME : 0); |
| |
| rd_rkb_dbg(rkb, TOPIC, "OFFSET", |
| "OffsetRequest (v%hd, opv %d) " |
| "for %"PRId32" topic(s) and %"PRId32" partition(s)", |
| api_version, rkbuf->rkbuf_replyq.version, |
| topic_cnt, partitions->cnt); |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
| } |
| |
| |
| /** |
| * Generic handler for OffsetFetch responses. |
| * Offsets for included partitions will be propagated through the passed |
| * 'offsets' list. |
| * |
| * \p update_toppar: update toppar's committed_offset |
| */ |
| rd_kafka_resp_err_t |
| rd_kafka_handle_OffsetFetch (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| rd_kafka_topic_partition_list_t *offsets, |
| int update_toppar) { |
| const int log_decode_errors = LOG_ERR; |
| int32_t TopicArrayCnt; |
| int64_t offset = RD_KAFKA_OFFSET_INVALID; |
| rd_kafkap_str_t metadata; |
| int i; |
| int actions; |
| int seen_cnt = 0; |
| |
| if (err) |
| goto err; |
| |
| /* Set default offset for all partitions. */ |
| rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, offsets, 0, |
| RD_KAFKA_OFFSET_INVALID, |
| 0 /* !is commit */); |
| |
| rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); |
| for (i = 0 ; i < TopicArrayCnt ; i++) { |
| rd_kafkap_str_t topic; |
| int32_t PartArrayCnt; |
| char *topic_name; |
| int j; |
| |
| rd_kafka_buf_read_str(rkbuf, &topic); |
| rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); |
| |
| RD_KAFKAP_STR_DUPA(&topic_name, &topic); |
| |
| for (j = 0 ; j < PartArrayCnt ; j++) { |
| int32_t partition; |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_kafka_topic_partition_t *rktpar; |
| int16_t err2; |
| |
| rd_kafka_buf_read_i32(rkbuf, &partition); |
| rd_kafka_buf_read_i64(rkbuf, &offset); |
| rd_kafka_buf_read_str(rkbuf, &metadata); |
| rd_kafka_buf_read_i16(rkbuf, &err2); |
| |
| rktpar = rd_kafka_topic_partition_list_find(offsets, |
| topic_name, |
| partition); |
| if (!rktpar) { |
| rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH", |
| "OffsetFetchResponse: %s [%"PRId32"] " |
| "not found in local list: ignoring", |
| topic_name, partition); |
| continue; |
| } |
| |
| seen_cnt++; |
| |
| if (!(s_rktp = rktpar->_private)) { |
| s_rktp = rd_kafka_toppar_get2(rkb->rkb_rk, |
| topic_name, |
| partition, 0, 0); |
| /* May be NULL if topic is not locally known */ |
| rktpar->_private = s_rktp; |
| } |
| |
| /* broker reports invalid offset as -1 */ |
| if (offset == -1) |
| rktpar->offset = RD_KAFKA_OFFSET_INVALID; |
| else |
| rktpar->offset = offset; |
| rktpar->err = err2; |
| |
| rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH", |
| "OffsetFetchResponse: %s [%"PRId32"] offset %"PRId64, |
| topic_name, partition, offset); |
| |
| if (update_toppar && !err2 && s_rktp) { |
| rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
| /* Update toppar's committed offset */ |
| rd_kafka_toppar_lock(rktp); |
| rktp->rktp_committed_offset = rktpar->offset; |
| rd_kafka_toppar_unlock(rktp); |
| } |
| |
| |
| if (rktpar->metadata) |
| rd_free(rktpar->metadata); |
| |
| if (RD_KAFKAP_STR_IS_NULL(&metadata)) { |
| rktpar->metadata = NULL; |
| rktpar->metadata_size = 0; |
| } else { |
| rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata); |
| rktpar->metadata_size = |
| RD_KAFKAP_STR_LEN(&metadata); |
| } |
| } |
| } |
| |
| |
| err: |
| rd_rkb_dbg(rkb, TOPIC, "OFFFETCH", |
| "OffsetFetch for %d/%d partition(s) returned %s", |
| seen_cnt, |
| offsets ? offsets->cnt : -1, rd_kafka_err2str(err)); |
| |
| actions = rd_kafka_err_action(rkb, err, rkbuf, request, |
| RD_KAFKA_ERR_ACTION_END); |
| |
| if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
| /* Re-query for coordinator */ |
| rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL, |
| RD_KAFKA_NO_REPLYQ, |
| RD_KAFKA_OP_COORD_QUERY, err); |
| if (request) { |
| /* Schedule a retry */ |
| rd_kafka_buf_keep(request); |
| rd_kafka_broker_buf_retry(request->rkbuf_rkb, request); |
| } |
| } |
| |
| return err; |
| |
| err_parse: |
| err = rkbuf->rkbuf_err; |
| goto err; |
| } |
| |
| |
| |
| /** |
| * opaque=rko wrapper for handle_OffsetFetch. |
| * rko->rko_payload MUST be a `rd_kafka_topic_partition_list_t *` which will |
| * be filled in with fetch offsets. |
| * |
| * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH. |
| * |
| * Locality: cgrp's broker thread |
| */ |
| void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| rd_kafka_op_t *rko = opaque; |
| rd_kafka_op_t *rko_reply; |
| rd_kafka_topic_partition_list_t *offsets; |
| |
| RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH); |
| |
| if (err == RD_KAFKA_RESP_ERR__DESTROY) { |
| /* Termination, quick cleanup. */ |
| rd_kafka_op_destroy(rko); |
| return; |
| } |
| |
| offsets = rd_kafka_topic_partition_list_copy( |
| rko->rko_u.offset_fetch.partitions); |
| |
| rko_reply = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH|RD_KAFKA_OP_REPLY); |
| rko_reply->rko_err = err; |
| rko_reply->rko_u.offset_fetch.partitions = offsets; |
| rko_reply->rko_u.offset_fetch.do_free = 1; |
| if (rko->rko_rktp) |
| rko_reply->rko_rktp = rd_kafka_toppar_keep( |
| rd_kafka_toppar_s2i(rko->rko_rktp)); |
| |
| /* If all partitions already had usable offsets then there |
| * was no request sent and thus no reply, the offsets list is |
| * good to go. */ |
| if (rkbuf) |
| rd_kafka_handle_OffsetFetch(rkb->rkb_rk, rkb, err, rkbuf, |
| request, offsets, 0); |
| |
| rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0); |
| |
| rd_kafka_op_destroy(rko); |
| } |
| |
| |
| |
| |
| |
| |
| /** |
| * Send OffsetFetchRequest for toppar. |
| * |
| * Any partition with a usable offset will be ignored, if all partitions |
| * have usable offsets then no request is sent at all but an empty |
| * reply is enqueued on the replyq. |
| */ |
| void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb, |
| int16_t api_version, |
| rd_kafka_topic_partition_list_t *parts, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque) { |
| rd_kafka_buf_t *rkbuf; |
| size_t of_TopicCnt; |
| int TopicCnt = 0; |
| ssize_t of_PartCnt = -1; |
| const char *last_topic = NULL; |
| int PartCnt = 0; |
| int tot_PartCnt = 0; |
| int i; |
| |
| rkbuf = rd_kafka_buf_new_request( |
| rkb, RD_KAFKAP_OffsetFetch, 1, |
| RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_group_id) + |
| 4 + |
| (parts->cnt * 32)); |
| |
| |
| /* ConsumerGroup */ |
| rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_group_id); |
| |
| /* Sort partitions by topic */ |
| rd_kafka_topic_partition_list_sort_by_topic(parts); |
| |
| /* TopicArrayCnt */ |
| of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */ |
| |
| for (i = 0 ; i < parts->cnt ; i++) { |
| rd_kafka_topic_partition_t *rktpar = &parts->elems[i]; |
| |
| /* Ignore partitions with a usable offset. */ |
| if (rktpar->offset != RD_KAFKA_OFFSET_INVALID && |
| rktpar->offset != RD_KAFKA_OFFSET_STORED) { |
| rd_rkb_dbg(rkb, TOPIC, "OFFSET", |
| "OffsetFetchRequest: skipping %s [%"PRId32"] " |
| "with valid offset %s", |
| rktpar->topic, rktpar->partition, |
| rd_kafka_offset2str(rktpar->offset)); |
| continue; |
| } |
| |
| if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) { |
| /* New topic */ |
| |
| /* Finalize previous PartitionCnt */ |
| if (PartCnt > 0) |
| rd_kafka_buf_update_u32(rkbuf, of_PartCnt, |
| PartCnt); |
| |
| /* TopicName */ |
| rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); |
| /* PartitionCnt, finalized later */ |
| of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
| PartCnt = 0; |
| last_topic = rktpar->topic; |
| TopicCnt++; |
| } |
| |
| /* Partition */ |
| rd_kafka_buf_write_i32(rkbuf, rktpar->partition); |
| PartCnt++; |
| tot_PartCnt++; |
| } |
| |
| /* Finalize previous PartitionCnt */ |
| if (PartCnt > 0) |
| rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt); |
| |
| /* Finalize TopicCnt */ |
| rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt); |
| |
| rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0); |
| |
| rd_rkb_dbg(rkb, TOPIC, "OFFSET", |
| "OffsetFetchRequest(v%d) for %d/%d partition(s)", |
| api_version, tot_PartCnt, parts->cnt); |
| |
| if (tot_PartCnt == 0) { |
| /* No partitions needs OffsetFetch, enqueue empty |
| * response right away. */ |
| rkbuf->rkbuf_replyq = replyq; |
| rkbuf->rkbuf_cb = resp_cb; |
| rkbuf->rkbuf_opaque = opaque; |
| rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf); |
| return; |
| } |
| |
| |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
| } |
| |
| |
| /** |
| * @remark \p offsets may be NULL if \p err is set |
| */ |
| rd_kafka_resp_err_t |
| rd_kafka_handle_OffsetCommit (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| rd_kafka_topic_partition_list_t *offsets) { |
| const int log_decode_errors = LOG_ERR; |
| int32_t TopicArrayCnt; |
| int16_t ErrorCode = 0, last_ErrorCode = 0; |
| int errcnt = 0; |
| int i; |
| int actions; |
| |
| if (err) |
| goto err; |
| |
| rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); |
| for (i = 0 ; i < TopicArrayCnt ; i++) { |
| rd_kafkap_str_t topic; |
| char *topic_str; |
| int32_t PartArrayCnt; |
| int j; |
| |
| rd_kafka_buf_read_str(rkbuf, &topic); |
| rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); |
| |
| RD_KAFKAP_STR_DUPA(&topic_str, &topic); |
| |
| for (j = 0 ; j < PartArrayCnt ; j++) { |
| int32_t partition; |
| rd_kafka_topic_partition_t *rktpar; |
| |
| rd_kafka_buf_read_i32(rkbuf, &partition); |
| rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
| |
| rktpar = rd_kafka_topic_partition_list_find( |
| offsets, topic_str, partition); |
| |
| if (!rktpar) { |
| /* Received offset for topic/partition we didn't |
| * ask for, this shouldn't really happen. */ |
| continue; |
| } |
| |
| rktpar->err = ErrorCode; |
| if (ErrorCode) { |
| last_ErrorCode = ErrorCode; |
| errcnt++; |
| } |
| } |
| } |
| |
| /* If all partitions failed use error code |
| * from last partition as the global error. */ |
| if (offsets && errcnt == offsets->cnt) |
| err = last_ErrorCode; |
| goto done; |
| |
| err_parse: |
| err = rkbuf->rkbuf_err; |
| |
| err: |
| actions = rd_kafka_err_action( |
| rkb, err, rkbuf, request, |
| |
| RD_KAFKA_ERR_ACTION_PERMANENT, |
| RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE, |
| |
| RD_KAFKA_ERR_ACTION_RETRY, |
| RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS, |
| |
| RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL, |
| RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE, |
| |
| RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL, |
| RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP, |
| |
| RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY, |
| RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, |
| |
| RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY, |
| RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, |
| |
| RD_KAFKA_ERR_ACTION_RETRY, |
| RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS, |
| |
| RD_KAFKA_ERR_ACTION_PERMANENT, |
| RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE, |
| |
| RD_KAFKA_ERR_ACTION_PERMANENT, |
| RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED, |
| |
| RD_KAFKA_ERR_ACTION_PERMANENT, |
| RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED, |
| |
| RD_KAFKA_ERR_ACTION_END); |
| |
| if (actions & RD_KAFKA_ERR_ACTION_REFRESH && rk->rk_cgrp) { |
| /* Mark coordinator dead or re-query for coordinator. |
| * ..dead() will trigger a re-query. */ |
| if (actions & RD_KAFKA_ERR_ACTION_SPECIAL) |
| rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err, |
| "OffsetCommitRequest failed"); |
| else |
| rd_kafka_cgrp_coord_query(rk->rk_cgrp, |
| "OffsetCommitRequest failed"); |
| } |
| if (actions & RD_KAFKA_ERR_ACTION_RETRY) { |
| if (rd_kafka_buf_retry(rkb, request)) |
| return RD_KAFKA_RESP_ERR__IN_PROGRESS; |
| /* FALLTHRU */ |
| } |
| |
| done: |
| return err; |
| } |
| |
| |
| |
| |
| /** |
| * @brief Send OffsetCommitRequest for a list of partitions. |
| * |
| * @returns 0 if none of the partitions in \p offsets had valid offsets, |
| * else 1. |
| */ |
| int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb, |
| rd_kafka_cgrp_t *rkcg, |
| int16_t api_version, |
| rd_kafka_topic_partition_list_t *offsets, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque, const char *reason) { |
| rd_kafka_buf_t *rkbuf; |
| ssize_t of_TopicCnt = -1; |
| int TopicCnt = 0; |
| const char *last_topic = NULL; |
| ssize_t of_PartCnt = -1; |
| int PartCnt = 0; |
| int tot_PartCnt = 0; |
| int i; |
| |
| rd_kafka_assert(NULL, offsets != NULL); |
| |
| rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit, |
| 1, 100 + (offsets->cnt * 128)); |
| |
| /* ConsumerGroup */ |
| rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_group_id); |
| |
| /* v1,v2 */ |
| if (api_version >= 1) { |
| /* ConsumerGroupGenerationId */ |
| rd_kafka_buf_write_i32(rkbuf, rkcg->rkcg_generation_id); |
| /* ConsumerId */ |
| rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_member_id); |
| /* v2: RetentionTime */ |
| if (api_version == 2) |
| rd_kafka_buf_write_i64(rkbuf, -1); |
| } |
| |
| /* Sort offsets by topic */ |
| rd_kafka_topic_partition_list_sort_by_topic(offsets); |
| |
| /* TopicArrayCnt: Will be updated when we know the number of topics. */ |
| of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
| |
| for (i = 0 ; i < offsets->cnt ; i++) { |
| rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; |
| |
| /* Skip partitions with invalid offset. */ |
| if (rktpar->offset < 0) |
| continue; |
| |
| if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) { |
| /* New topic */ |
| |
| /* Finalize previous PartitionCnt */ |
| if (PartCnt > 0) |
| rd_kafka_buf_update_u32(rkbuf, of_PartCnt, |
| PartCnt); |
| |
| /* TopicName */ |
| rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); |
| /* PartitionCnt, finalized later */ |
| of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
| PartCnt = 0; |
| last_topic = rktpar->topic; |
| TopicCnt++; |
| } |
| |
| /* Partition */ |
| rd_kafka_buf_write_i32(rkbuf, rktpar->partition); |
| PartCnt++; |
| tot_PartCnt++; |
| |
| /* Offset */ |
| rd_kafka_buf_write_i64(rkbuf, rktpar->offset); |
| |
| /* v1: TimeStamp */ |
| if (api_version == 1) |
| rd_kafka_buf_write_i64(rkbuf, -1);// FIXME: retention time |
| |
| /* Metadata */ |
| /* Java client 0.9.0 and broker <0.10.0 can't parse |
| * Null metadata fields, so as a workaround we send an |
| * empty string if it's Null. */ |
| if (!rktpar->metadata) |
| rd_kafka_buf_write_str(rkbuf, "", 0); |
| else |
| rd_kafka_buf_write_str(rkbuf, |
| rktpar->metadata, |
| rktpar->metadata_size); |
| } |
| |
| if (tot_PartCnt == 0) { |
| /* No topic+partitions had valid offsets to commit. */ |
| rd_kafka_replyq_destroy(&replyq); |
| rd_kafka_buf_destroy(rkbuf); |
| return 0; |
| } |
| |
| /* Finalize previous PartitionCnt */ |
| if (PartCnt > 0) |
| rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt); |
| |
| /* Finalize TopicCnt */ |
| rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt); |
| |
| rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0); |
| |
| rd_rkb_dbg(rkb, TOPIC, "OFFSET", |
| "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s", |
| api_version, tot_PartCnt, offsets->cnt, reason); |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
| |
| return 1; |
| |
| } |
| |
| |
| |
| /** |
| * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to |
| * enveloping buffer \p rkbuf. |
| */ |
| static void rd_kafka_group_MemberState_consumer_write ( |
| rd_kafka_buf_t *env_rkbuf, |
| const rd_kafka_group_member_t *rkgm) { |
| rd_kafka_buf_t *rkbuf; |
| int i; |
| const char *last_topic = NULL; |
| size_t of_TopicCnt; |
| ssize_t of_PartCnt = -1; |
| int TopicCnt = 0; |
| int PartCnt = 0; |
| rd_slice_t slice; |
| |
| rkbuf = rd_kafka_buf_new(1, 100); |
| rd_kafka_buf_write_i16(rkbuf, 0); /* Version */ |
| of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */ |
| for (i = 0 ; i < rkgm->rkgm_assignment->cnt ; i++) { |
| const rd_kafka_topic_partition_t *rktpar; |
| |
| rktpar = &rkgm->rkgm_assignment->elems[i]; |
| |
| if (!last_topic || strcmp(last_topic, |
| rktpar->topic)) { |
| if (last_topic) |
| /* Finalize previous PartitionCnt */ |
| rd_kafka_buf_update_i32(rkbuf, of_PartCnt, |
| PartCnt); |
| rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); |
| /* Updated later */ |
| of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
| PartCnt = 0; |
| last_topic = rktpar->topic; |
| TopicCnt++; |
| } |
| |
| rd_kafka_buf_write_i32(rkbuf, rktpar->partition); |
| PartCnt++; |
| } |
| |
| if (of_PartCnt != -1) |
| rd_kafka_buf_update_i32(rkbuf, of_PartCnt, PartCnt); |
| rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt); |
| |
| rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata); |
| |
| /* Get pointer to binary buffer */ |
| rd_slice_init_full(&slice, &rkbuf->rkbuf_buf); |
| |
| /* Write binary buffer as Kafka Bytes to enveloping buffer. */ |
| rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice)); |
| rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice); |
| |
| rd_kafka_buf_destroy(rkbuf); |
| } |
| |
| /** |
| * Send SyncGroupRequest |
| */ |
| void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb, |
| const rd_kafkap_str_t *group_id, |
| int32_t generation_id, |
| const rd_kafkap_str_t *member_id, |
| const rd_kafka_group_member_t |
| *assignments, |
| int assignment_cnt, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque) { |
| rd_kafka_buf_t *rkbuf; |
| int i; |
| |
| rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SyncGroup, |
| 1, |
| RD_KAFKAP_STR_SIZE(group_id) + |
| 4 /* GenerationId */ + |
| RD_KAFKAP_STR_SIZE(member_id) + |
| 4 /* array size group_assignment */ + |
| (assignment_cnt * 100/*guess*/)); |
| rd_kafka_buf_write_kstr(rkbuf, group_id); |
| rd_kafka_buf_write_i32(rkbuf, generation_id); |
| rd_kafka_buf_write_kstr(rkbuf, member_id); |
| rd_kafka_buf_write_i32(rkbuf, assignment_cnt); |
| |
| for (i = 0 ; i < assignment_cnt ; i++) { |
| const rd_kafka_group_member_t *rkgm = &assignments[i]; |
| |
| rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id); |
| rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm); |
| } |
| |
| /* This is a blocking request */ |
| rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING; |
| rkbuf->rkbuf_ts_timeout = rd_clock() + |
| (rkb->rkb_rk->rk_conf.group_session_timeout_ms * 1000) + |
| (3*1000*1000/* 3s grace period*/); |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
| } |
| |
| /** |
| * Handler for SyncGroup responses |
| * opaque must be the cgrp handle. |
| */ |
| void rd_kafka_handle_SyncGroup (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| rd_kafka_cgrp_t *rkcg = opaque; |
| const int log_decode_errors = LOG_ERR; |
| int16_t ErrorCode = 0; |
| rd_kafkap_bytes_t MemberState = RD_ZERO_INIT; |
| int actions; |
| |
| if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) { |
| rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP", |
| "SyncGroup response: discarding outdated request " |
| "(now in join-state %s)", |
| rd_kafka_cgrp_join_state_names[rkcg-> |
| rkcg_join_state]); |
| return; |
| } |
| |
| if (err) { |
| ErrorCode = err; |
| goto err; |
| } |
| |
| rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
| rd_kafka_buf_read_bytes(rkbuf, &MemberState); |
| |
| err: |
| actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request, |
| RD_KAFKA_ERR_ACTION_END); |
| |
| if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
| /* Re-query for coordinator */ |
| rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, |
| RD_KAFKA_OP_COORD_QUERY, |
| ErrorCode); |
| /* FALLTHRU */ |
| } |
| |
| rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP", |
| "SyncGroup response: %s (%d bytes of MemberState data)", |
| rd_kafka_err2str(ErrorCode), |
| RD_KAFKAP_BYTES_LEN(&MemberState)); |
| |
| if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY) |
| return; /* Termination */ |
| |
| rd_kafka_cgrp_handle_SyncGroup(rkcg, rkb, ErrorCode, &MemberState); |
| |
| return; |
| |
| err_parse: |
| ErrorCode = rkbuf->rkbuf_err; |
| goto err; |
| } |
| |
| |
| /** |
| * Send JoinGroupRequest |
| */ |
| void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb, |
| const rd_kafkap_str_t *group_id, |
| const rd_kafkap_str_t *member_id, |
| const rd_kafkap_str_t *protocol_type, |
| const rd_list_t *topics, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque) { |
| rd_kafka_buf_t *rkbuf; |
| rd_kafka_t *rk = rkb->rkb_rk; |
| rd_kafka_assignor_t *rkas; |
| int i; |
| |
| rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_JoinGroup, |
| 1, |
| RD_KAFKAP_STR_SIZE(group_id) + |
| 4 /* sessionTimeoutMs */ + |
| RD_KAFKAP_STR_SIZE(member_id) + |
| RD_KAFKAP_STR_SIZE(protocol_type) + |
| 4 /* array count GroupProtocols */ + |
| (rd_list_cnt(topics) * 100)); |
| rd_kafka_buf_write_kstr(rkbuf, group_id); |
| rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms); |
| rd_kafka_buf_write_kstr(rkbuf, member_id); |
| rd_kafka_buf_write_kstr(rkbuf, protocol_type); |
| rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt); |
| |
| RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) { |
| rd_kafkap_bytes_t *member_metadata; |
| if (!rkas->rkas_enabled) |
| continue; |
| rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name); |
| member_metadata = rkas->rkas_get_metadata_cb(rkas, topics); |
| rd_kafka_buf_write_kbytes(rkbuf, member_metadata); |
| rd_kafkap_bytes_destroy(member_metadata); |
| } |
| |
| /* This is a blocking request */ |
| rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING; |
| rkbuf->rkbuf_ts_timeout = rd_clock() + |
| (rk->rk_conf.group_session_timeout_ms * 1000) + |
| (3*1000*1000/* 3s grace period*/); |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
| } |
| |
| |
| |
| |
| |
| |
| /** |
| * Send LeaveGroupRequest |
| */ |
| void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb, |
| const rd_kafkap_str_t *group_id, |
| const rd_kafkap_str_t *member_id, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque) { |
| rd_kafka_buf_t *rkbuf; |
| |
| rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, |
| 1, |
| RD_KAFKAP_STR_SIZE(group_id) + |
| RD_KAFKAP_STR_SIZE(member_id)); |
| rd_kafka_buf_write_kstr(rkbuf, group_id); |
| rd_kafka_buf_write_kstr(rkbuf, member_id); |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
| } |
| |
| |
| /** |
| * Handler for LeaveGroup responses |
| * opaque must be the cgrp handle. |
| */ |
| void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| rd_kafka_cgrp_t *rkcg = opaque; |
| const int log_decode_errors = LOG_ERR; |
| int16_t ErrorCode = 0; |
| int actions; |
| |
| if (err) { |
| ErrorCode = err; |
| goto err; |
| } |
| |
| rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
| |
| |
| err: |
| actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request, |
| RD_KAFKA_ERR_ACTION_END); |
| |
| if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
| /* Re-query for coordinator */ |
| rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, |
| RD_KAFKA_OP_COORD_QUERY, ErrorCode); |
| /* Schedule a retry */ |
| rd_kafka_buf_keep(request); |
| rd_kafka_broker_buf_retry(request->rkbuf_rkb, request); |
| return; |
| } |
| |
| if (ErrorCode) |
| rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", |
| "LeaveGroup response: %s", |
| rd_kafka_err2str(ErrorCode)); |
| |
| err_parse: |
| ErrorCode = rkbuf->rkbuf_err; |
| goto err; |
| } |
| |
| |
| |
| |
| |
| |
| /** |
| * Send HeartbeatRequest |
| */ |
| void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb, |
| const rd_kafkap_str_t *group_id, |
| int32_t generation_id, |
| const rd_kafkap_str_t *member_id, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque) { |
| rd_kafka_buf_t *rkbuf; |
| |
| rd_rkb_dbg(rkb, CGRP, "HEARTBEAT", |
| "Heartbeat for group \"%s\" generation id %"PRId32, |
| group_id->str, generation_id); |
| |
| rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat, |
| 1, |
| RD_KAFKAP_STR_SIZE(group_id) + |
| 4 /* GenerationId */ + |
| RD_KAFKAP_STR_SIZE(member_id)); |
| |
| rd_kafka_buf_write_kstr(rkbuf, group_id); |
| rd_kafka_buf_write_i32(rkbuf, generation_id); |
| rd_kafka_buf_write_kstr(rkbuf, member_id); |
| |
| rkbuf->rkbuf_ts_timeout = rd_clock() + |
| (rkb->rkb_rk->rk_conf.group_session_timeout_ms * 1000); |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
| } |
| |
| |
| |
| |
| /** |
| * Send ListGroupsRequest |
| */ |
| void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque) { |
| rd_kafka_buf_t *rkbuf; |
| |
| rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ListGroups, 0, 0); |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
| } |
| |
| |
| /** |
| * Send DescribeGroupsRequest |
| */ |
| void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb, |
| const char **groups, int group_cnt, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque) { |
| rd_kafka_buf_t *rkbuf; |
| |
| rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeGroups, |
| 1, 32*group_cnt); |
| |
| rd_kafka_buf_write_i32(rkbuf, group_cnt); |
| while (group_cnt-- > 0) |
| rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1); |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
| } |
| |
| |
| |
| |
| /** |
| * @brief Generic handler for Metadata responses |
| * |
| * @locality rdkafka main thread |
| */ |
| static void rd_kafka_handle_Metadata (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| rd_kafka_op_t *rko = opaque; /* Possibly NULL */ |
| struct rd_kafka_metadata *md = NULL; |
| const rd_list_t *topics = request->rkbuf_u.Metadata.topics; |
| |
| rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY || |
| thrd_is_current(rk->rk_thread)); |
| |
| /* Avoid metadata updates when we're terminating. */ |
| if (rd_kafka_terminating(rkb->rkb_rk)) |
| err = RD_KAFKA_RESP_ERR__DESTROY; |
| |
| if (unlikely(err)) { |
| if (err == RD_KAFKA_RESP_ERR__DESTROY) { |
| /* Terminating */ |
| goto done; |
| } |
| |
| /* FIXME: handle errors */ |
| rd_rkb_log(rkb, LOG_WARNING, "METADATA", |
| "Metadata request failed: %s (%dms)", |
| rd_kafka_err2str(err), |
| (int)(request->rkbuf_ts_sent/1000)); |
| } else { |
| |
| if (!topics) |
| rd_rkb_dbg(rkb, METADATA, "METADATA", |
| "===== Received metadata: %s =====", |
| request->rkbuf_u.Metadata.reason); |
| else |
| rd_rkb_dbg(rkb, METADATA, "METADATA", |
| "===== Received metadata " |
| "(for %d requested topics): %s =====", |
| rd_list_cnt(topics), |
| request->rkbuf_u.Metadata.reason); |
| |
| md = rd_kafka_parse_Metadata(rkb, request, rkbuf); |
| if (!md) { |
| if (rd_kafka_buf_retry(rkb, request)) |
| return; |
| err = RD_KAFKA_RESP_ERR__BAD_MSG; |
| } |
| } |
| |
| if (rko && rko->rko_replyq.q) { |
| /* Reply to metadata requester, passing on the metadata. |
| * Reuse requesting rko for the reply. */ |
| rko->rko_err = err; |
| rko->rko_u.metadata.md = md; |
| |
| rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); |
| rko = NULL; |
| } else { |
| if (md) |
| rd_free(md); |
| } |
| |
| done: |
| if (rko) |
| rd_kafka_op_destroy(rko); |
| } |
| |
| |
| |
| /** |
| * @brief Construct MetadataRequest (does not send) |
| * |
| * \p topics is a list of topic names (char *) to request. |
| * |
| * !topics - only request brokers (if supported by broker, else |
| * all topics) |
| * topics.cnt==0 - all topics in cluster are requested |
| * topics.cnt >0 - only specified topics are requested |
| * |
| * @param reason - metadata request reason |
| * @param rko - (optional) rko with replyq for handling response. |
| * Specifying an rko forces a metadata request even if |
| * there is already a matching one in-transit. |
| * |
| * If full metadata for all topics is requested (or all brokers, which |
| * results in all-topics on older brokers) and there is already a full request |
| * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS |
| * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request |
| * is sent regardless. |
| */ |
| rd_kafka_resp_err_t |
| rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb, |
| const rd_list_t *topics, const char *reason, |
| rd_kafka_op_t *rko) { |
| rd_kafka_buf_t *rkbuf; |
| int16_t ApiVersion = 0; |
| int features; |
| int topic_cnt = topics ? rd_list_cnt(topics) : 0; |
| int *full_incr = NULL; |
| |
| ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, |
| RD_KAFKAP_Metadata, |
| 0, 2, |
| &features); |
| |
| rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Metadata, 1, |
| 4 + (50 * topic_cnt)); |
| |
| if (!reason) |
| reason = ""; |
| |
| rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason); |
| |
| if (!topics && ApiVersion >= 1) { |
| /* a null(0) array (in the protocol) represents no topics */ |
| rd_kafka_buf_write_i32(rkbuf, 0); |
| rd_rkb_dbg(rkb, METADATA, "METADATA", |
| "Request metadata for brokers only: %s", reason); |
| full_incr = &rkb->rkb_rk->rk_metadata_cache. |
| rkmc_full_brokers_sent; |
| |
| } else { |
| if (topic_cnt == 0 && !rko) |
| full_incr = &rkb->rkb_rk->rk_metadata_cache. |
| rkmc_full_topics_sent; |
| |
| if (topic_cnt == 0 && ApiVersion >= 1) |
| rd_kafka_buf_write_i32(rkbuf, -1); /* Null: all topics*/ |
| else |
| rd_kafka_buf_write_i32(rkbuf, topic_cnt); |
| |
| if (topic_cnt == 0) { |
| rkbuf->rkbuf_u.Metadata.all_topics = 1; |
| rd_rkb_dbg(rkb, METADATA, "METADATA", |
| "Request metadata for all topics: " |
| "%s", reason); |
| } else |
| rd_rkb_dbg(rkb, METADATA, "METADATA", |
| "Request metadata for %d topic(s): " |
| "%s", topic_cnt, reason); |
| } |
| |
| if (full_incr) { |
| /* Avoid multiple outstanding full requests |
| * (since they are redundant and side-effect-less). |
| * Forced requests (app using metadata() API) are passed |
| * through regardless. */ |
| |
| mtx_lock(&rkb->rkb_rk->rk_metadata_cache. |
| rkmc_full_lock); |
| if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) { |
| mtx_unlock(&rkb->rkb_rk->rk_metadata_cache. |
| rkmc_full_lock); |
| rd_rkb_dbg(rkb, METADATA, "METADATA", |
| "Skipping metadata request: %s: " |
| "full request already in-transit", |
| reason); |
| rd_kafka_buf_destroy(rkbuf); |
| return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS; |
| } |
| |
| (*full_incr)++; |
| mtx_unlock(&rkb->rkb_rk->rk_metadata_cache. |
| rkmc_full_lock); |
| rkbuf->rkbuf_u.Metadata.decr = full_incr; |
| rkbuf->rkbuf_u.Metadata.decr_lock = &rkb->rkb_rk-> |
| rk_metadata_cache.rkmc_full_lock; |
| } |
| |
| |
| if (topic_cnt > 0) { |
| char *topic; |
| int i; |
| |
| /* Maintain a copy of the topics list so we can purge |
| * hints from the metadata cache on error. */ |
| rkbuf->rkbuf_u.Metadata.topics = |
| rd_list_copy(topics, rd_list_string_copy, NULL); |
| |
| RD_LIST_FOREACH(topic, topics, i) |
| rd_kafka_buf_write_str(rkbuf, topic, -1); |
| |
| } |
| |
| rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); |
| |
| /* Metadata requests are part of the important control plane |
| * and should go before other requests (Produce, Fetch, etc). */ |
| rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_FLASH; |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, |
| /* Handle response thru rk_ops, |
| * but forward parsed result to |
| * rko's replyq when done. */ |
| RD_KAFKA_REPLYQ(rkb->rkb_rk-> |
| rk_ops, 0), |
| rd_kafka_handle_Metadata, rko); |
| |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| /** |
| * @brief Parses and handles ApiVersion reply. |
| * |
| * @param apis will be allocated, populated and sorted |
| * with broker's supported APIs. |
| * @param api_cnt will be set to the number of elements in \p *apis |
| |
| * @returns 0 on success, else an error. |
| */ |
| rd_kafka_resp_err_t |
| rd_kafka_handle_ApiVersion (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| struct rd_kafka_ApiVersion **apis, |
| size_t *api_cnt) { |
| const int log_decode_errors = LOG_ERR; |
| int actions; |
| int32_t ApiArrayCnt; |
| int16_t ErrorCode; |
| int i = 0; |
| |
| *apis = NULL; |
| |
| if (err) |
| goto err; |
| |
| rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
| if ((err = ErrorCode)) |
| goto err; |
| |
| rd_kafka_buf_read_i32(rkbuf, &ApiArrayCnt); |
| if (ApiArrayCnt > 1000) |
| rd_kafka_buf_parse_fail(rkbuf, |
| "ApiArrayCnt %"PRId32" out of range", |
| ApiArrayCnt); |
| |
| rd_rkb_dbg(rkb, FEATURE, "APIVERSION", |
| "Broker API support:"); |
| |
| *apis = malloc(sizeof(**apis) * ApiArrayCnt); |
| |
| for (i = 0 ; i < ApiArrayCnt ; i++) { |
| struct rd_kafka_ApiVersion *api = &(*apis)[i]; |
| |
| rd_kafka_buf_read_i16(rkbuf, &api->ApiKey); |
| rd_kafka_buf_read_i16(rkbuf, &api->MinVer); |
| rd_kafka_buf_read_i16(rkbuf, &api->MaxVer); |
| |
| rd_rkb_dbg(rkb, FEATURE, "APIVERSION", |
| " ApiKey %s (%hd) Versions %hd..%hd", |
| rd_kafka_ApiKey2str(api->ApiKey), |
| api->ApiKey, api->MinVer, api->MaxVer); |
| } |
| |
| *api_cnt = ApiArrayCnt; |
| qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp); |
| |
| goto done; |
| |
| err_parse: |
| err = rkbuf->rkbuf_err; |
| err: |
| if (*apis) |
| rd_free(*apis); |
| |
| actions = rd_kafka_err_action( |
| rkb, err, rkbuf, request, |
| RD_KAFKA_ERR_ACTION_END); |
| |
| if (actions & RD_KAFKA_ERR_ACTION_RETRY) { |
| if (rd_kafka_buf_retry(rkb, request)) |
| return RD_KAFKA_RESP_ERR__IN_PROGRESS; |
| /* FALLTHRU */ |
| } |
| |
| done: |
| return err; |
| } |
| |
| |
| |
| /** |
| * Send ApiVersionRequest (KIP-35) |
| */ |
| void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque, int flash_msg) { |
| rd_kafka_buf_t *rkbuf; |
| |
| rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ApiVersion, 1, 4); |
| rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0); |
| rd_kafka_buf_write_i32(rkbuf, 0); /* Empty array: request all APIs */ |
| |
| /* Non-supporting brokers will tear down the connection when they |
| * receive an unknown API request, so dont retry request on failure. */ |
| rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES; |
| |
| /* 0.9.0.x brokers will not close the connection on unsupported |
| * API requests, so we minimize the timeout for the request. |
| * This is a regression on the broker part. */ |
| rkbuf->rkbuf_ts_timeout = rd_clock() + (rkb->rkb_rk->rk_conf.api_version_request_timeout_ms * 1000); |
| |
| if (replyq.q) |
| rd_kafka_broker_buf_enq_replyq(rkb, |
| rkbuf, replyq, resp_cb, opaque); |
| else /* in broker thread */ |
| rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); |
| } |
| |
| |
| /** |
| * Send SaslHandshakeRequest (KIP-43) |
| */ |
| void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb, |
| const char *mechanism, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque, int flash_msg) { |
| rd_kafka_buf_t *rkbuf; |
| int mechlen = (int)strlen(mechanism); |
| |
| rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake, |
| 1, RD_KAFKAP_STR_SIZE0(mechlen)); |
| rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0); |
| rd_kafka_buf_write_str(rkbuf, mechanism, mechlen); |
| |
| /* Non-supporting brokers will tear down the conneciton when they |
| * receive an unknown API request or where the SASL GSSAPI |
| * token type is not recognized, so dont retry request on failure. */ |
| rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES; |
| |
| /* 0.9.0.x brokers will not close the connection on unsupported |
| * API requests, so we minimize the timeout of the request. |
| * This is a regression on the broker part. */ |
| if (!rkb->rkb_rk->rk_conf.api_version_request && |
| rkb->rkb_rk->rk_conf.socket_timeout_ms > 10*1000) |
| rkbuf->rkbuf_ts_timeout = rd_clock() + (10 * 1000 * 1000); |
| |
| if (replyq.q) |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, |
| resp_cb, opaque); |
| else /* in broker thread */ |
| rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); |
| } |
| |
| |
| |
| |
| /** |
| * @brief Parses a Produce reply. |
| * @returns 0 on success or an error code on failure. |
| * @locality broker thread |
| */ |
| static rd_kafka_resp_err_t |
| rd_kafka_handle_Produce_parse (rd_kafka_broker_t *rkb, |
| rd_kafka_toppar_t *rktp, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| int64_t *offsetp, |
| int64_t *timestampp) { |
| int32_t TopicArrayCnt; |
| int32_t PartitionArrayCnt; |
| struct { |
| int32_t Partition; |
| int16_t ErrorCode; |
| int64_t Offset; |
| } hdr; |
| const int log_decode_errors = LOG_ERR; |
| |
| rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); |
| if (TopicArrayCnt != 1) |
| goto err; |
| |
| /* Since we only produce to one single topic+partition in each |
| * request we assume that the reply only contains one topic+partition |
| * and that it is the same that we requested. |
| * If not the broker is buggy. */ |
| rd_kafka_buf_skip_str(rkbuf); |
| rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt); |
| |
| if (PartitionArrayCnt != 1) |
| goto err; |
| |
| rd_kafka_buf_read_i32(rkbuf, &hdr.Partition); |
| rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode); |
| rd_kafka_buf_read_i64(rkbuf, &hdr.Offset); |
| |
| *offsetp = hdr.Offset; |
| |
| *timestampp = -1; |
| if (request->rkbuf_reqhdr.ApiVersion >= 2) { |
| rd_kafka_buf_read_i64(rkbuf, timestampp); |
| } |
| |
| if (request->rkbuf_reqhdr.ApiVersion >= 1) { |
| int32_t Throttle_Time; |
| rd_kafka_buf_read_i32(rkbuf, &Throttle_Time); |
| |
| rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep, |
| Throttle_Time); |
| } |
| |
| |
| return hdr.ErrorCode; |
| |
| err_parse: |
| return rkbuf->rkbuf_err; |
| err: |
| return RD_KAFKA_RESP_ERR__BAD_MSG; |
| } |
| |
| |
| /** |
| * @brief Handle ProduceResponse |
| * |
| * @locality broker thread |
| */ |
| static void rd_kafka_handle_Produce (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *reply, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| shptr_rd_kafka_toppar_t *s_rktp = opaque; /* from ProduceRequest() */ |
| rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
| int64_t offset = RD_KAFKA_OFFSET_INVALID; |
| int64_t timestamp = -1; |
| |
| /* Parse Produce reply (unless the request errored) */ |
| if (!err && reply) |
| err = rd_kafka_handle_Produce_parse(rkb, rktp, |
| reply, request, |
| &offset, ×tamp); |
| |
| |
| if (likely(!err)) { |
| rd_rkb_dbg(rkb, MSG, "MSGSET", |
| "%s [%"PRId32"]: MessageSet with %i message(s) " |
| "delivered", |
| rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, |
| rd_atomic32_get(&request->rkbuf_msgq.rkmq_msg_cnt)); |
| |
| } else { |
| /* Error */ |
| int actions; |
| |
| actions = rd_kafka_err_action( |
| rkb, err, reply, request, |
| |
| RD_KAFKA_ERR_ACTION_REFRESH, |
| RD_KAFKA_RESP_ERR__TRANSPORT, |
| |
| RD_KAFKA_ERR_ACTION_REFRESH, |
| RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, |
| |
| RD_KAFKA_ERR_ACTION_END); |
| |
| rd_rkb_dbg(rkb, MSG, "MSGSET", |
| "%s [%"PRId32"]: MessageSet with %i message(s) " |
| "encountered error: %s (actions 0x%x)", |
| rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, |
| rd_atomic32_get(&request->rkbuf_msgq.rkmq_msg_cnt), |
| rd_kafka_err2str(err), actions); |
| |
| /* NOTE: REFRESH implies a later retry, which does NOT affect |
| * the retry count since refresh-errors are considered |
| * to be stale metadata rather than temporary errors. |
| * |
| * This is somewhat problematic since it may cause |
| * duplicate messages even with retries=0 if the |
| * ProduceRequest made it to the broker but only the |
| * response was lost due to network connectivity issues. |
| * That problem will be sorted when EoS is implemented. |
| */ |
| if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
| /* Request metadata information update */ |
| rd_kafka_toppar_leader_unavailable(rktp, |
| "produce", err); |
| |
| /* Move messages (in the rkbuf) back to the partition's |
| * queue head. They will be resent when a new leader |
| * is delegated. */ |
| rd_kafka_toppar_insert_msgq(rktp, &request->rkbuf_msgq); |
| |
| /* No need for fallthru here since the request |
| * no longer has any messages associated with it. */ |
| goto done; |
| } |
| |
| if ((actions & RD_KAFKA_ERR_ACTION_RETRY) && |
| rd_kafka_buf_retry(rkb, request)) |
| return; /* Scheduled for retry */ |
| |
| /* Refresh implies a later retry through other means */ |
| if (actions & RD_KAFKA_ERR_ACTION_REFRESH) |
| goto done; |
| |
| /* Translate request-level timeout error code |
| * to message-level timeout error code. */ |
| if (err == RD_KAFKA_RESP_ERR__TIMED_OUT) |
| err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT; |
| |
| /* Fatal errors: no message transmission retries */ |
| /* FALLTHRU */ |
| } |
| |
| /* Propagate assigned offset and timestamp back to app. */ |
| if (likely(offset != RD_KAFKA_OFFSET_INVALID)) { |
| rd_kafka_msg_t *rkm; |
| if (rktp->rktp_rkt->rkt_conf.produce_offset_report) { |
| /* produce.offset.report: each message */ |
| TAILQ_FOREACH(rkm, &request->rkbuf_msgq.rkmq_msgs, |
| rkm_link) { |
| rkm->rkm_offset = offset++; |
| if (timestamp != -1) { |
| rkm->rkm_timestamp = timestamp; |
| rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME; |
| } |
| } |
| } else { |
| /* Last message in each batch */ |
| rkm = TAILQ_LAST(&request->rkbuf_msgq.rkmq_msgs, |
| rd_kafka_msg_head_s); |
| rkm->rkm_offset = offset + |
| rd_atomic32_get(&request->rkbuf_msgq. |
| rkmq_msg_cnt) - 1; |
| if (timestamp != -1) { |
| rkm->rkm_timestamp = timestamp; |
| rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME; |
| } |
| } |
| } |
| |
| /* Enqueue messages for delivery report */ |
| rd_kafka_dr_msgq(rktp->rktp_rkt, &request->rkbuf_msgq, err); |
| |
| done: |
| rd_kafka_toppar_destroy(s_rktp); /* from ProduceRequest() */ |
| } |
| |
| |
| /** |
| * @brief Send ProduceRequest for messages in toppar queue. |
| * |
| * @returns the number of messages included, or 0 on error / no messages. |
| * |
| * @locality broker thread |
| */ |
| int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp) { |
| rd_kafka_buf_t *rkbuf; |
| rd_kafka_itopic_t *rkt = rktp->rktp_rkt; |
| size_t MessageSetSize = 0; |
| int cnt; |
| |
| /** |
| * Create ProduceRequest with as many messages from the toppar |
| * transmit queue as possible. |
| */ |
| rkbuf = rd_kafka_msgset_create_ProduceRequest(rkb, rktp, |
| &MessageSetSize); |
| if (unlikely(!rkbuf)) |
| return 0; |
| |
| cnt = rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt); |
| rd_dassert(cnt > 0); |
| |
| rd_atomic64_add(&rktp->rktp_c.tx_msgs, cnt); |
| rd_atomic64_add(&rktp->rktp_c.tx_bytes, MessageSetSize); |
| |
| if (!rkt->rkt_conf.required_acks) |
| rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE; |
| |
| /* Use timeout from first message. */ |
| rkbuf->rkbuf_ts_timeout = |
| TAILQ_FIRST(&rkbuf->rkbuf_msgq.rkmq_msgs)->rkm_ts_timeout; |
| |
| rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, |
| RD_KAFKA_NO_REPLYQ, |
| rd_kafka_handle_Produce, |
| /* toppar ref for handle_Produce() */ |
| rd_kafka_toppar_keep(rktp)); |
| |
| return cnt; |
| } |