| /* |
| * librdkafka - Apache Kafka C library |
| * |
| * Copyright (c) 2012-2013, Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| |
| #define _GNU_SOURCE |
| #include <errno.h> |
| #include <string.h> |
| #include <stdarg.h> |
| #include <signal.h> |
| #include <stdlib.h> |
| #include <sys/stat.h> |
| |
| #include "rdkafka_int.h" |
| #include "rdkafka_msg.h" |
| #include "rdkafka_broker.h" |
| #include "rdkafka_topic.h" |
| #include "rdkafka_partition.h" |
| #include "rdkafka_offset.h" |
| #include "rdkafka_transport.h" |
| #include "rdkafka_cgrp.h" |
| #include "rdkafka_assignor.h" |
| #include "rdkafka_request.h" |
| #include "rdkafka_event.h" |
| #include "rdkafka_sasl.h" |
| #include "rdkafka_interceptor.h" |
| |
| #include "rdtime.h" |
| #include "crc32c.h" |
| #include "rdunittest.h" |
| |
| #ifdef _MSC_VER |
| #include <sys/types.h> |
| #include <sys/timeb.h> |
| #endif |
| |
| |
| |
| static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT; |
| |
| /** |
| * @brief Global counter+lock for all active librdkafka instances |
| */ |
| mtx_t rd_kafka_global_lock; |
| int rd_kafka_global_cnt; |
| |
| |
| /** |
| * Last API error code, per thread. |
| * Shared among all rd_kafka_t instances. |
| */ |
| rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code; |
| |
| |
| /** |
| * Current number of threads created by rdkafka. |
| * This is used in regression tests. |
| */ |
| rd_atomic32_t rd_kafka_thread_cnt_curr; |
| int rd_kafka_thread_cnt (void) { |
| #if ENABLE_SHAREDPTR_DEBUG |
| rd_shared_ptrs_dump(); |
| #endif |
| |
| return rd_atomic32_get(&rd_kafka_thread_cnt_curr); |
| } |
| |
| /** |
| * Current thread's name (TLS) |
| */ |
| char RD_TLS rd_kafka_thread_name[64] = "app"; |
| |
| |
| |
| static void rd_kafka_global_init (void) { |
| #if ENABLE_SHAREDPTR_DEBUG |
| LIST_INIT(&rd_shared_ptr_debug_list); |
| mtx_init(&rd_shared_ptr_debug_mtx, mtx_plain); |
| atexit(rd_shared_ptrs_dump); |
| #endif |
| mtx_init(&rd_kafka_global_lock, mtx_plain); |
| #if ENABLE_DEVEL |
| rd_atomic32_init(&rd_kafka_op_cnt, 0); |
| #endif |
| crc32c_global_init(); |
| } |
| |
| /** |
| * @returns the current number of active librdkafka instances |
| */ |
| static int rd_kafka_global_cnt_get (void) { |
| int r; |
| mtx_lock(&rd_kafka_global_lock); |
| r = rd_kafka_global_cnt; |
| mtx_unlock(&rd_kafka_global_lock); |
| return r; |
| } |
| |
| |
| /** |
| * @brief Increase counter for active librdkafka instances. |
| * If this is the first instance the global constructors will be called, if any. |
| */ |
| static void rd_kafka_global_cnt_incr (void) { |
| mtx_lock(&rd_kafka_global_lock); |
| rd_kafka_global_cnt++; |
| if (rd_kafka_global_cnt == 1) { |
| rd_kafka_transport_init(); |
| #if WITH_SSL |
| rd_kafka_transport_ssl_init(); |
| #endif |
| rd_kafka_sasl_global_init(); |
| } |
| mtx_unlock(&rd_kafka_global_lock); |
| } |
| |
| /** |
| * @brief Decrease counter for active librdkafka instances. |
| * If this counter reaches 0 the global destructors will be called, if any. |
| */ |
| static void rd_kafka_global_cnt_decr (void) { |
| mtx_lock(&rd_kafka_global_lock); |
| rd_kafka_assert(NULL, rd_kafka_global_cnt > 0); |
| rd_kafka_global_cnt--; |
| if (rd_kafka_global_cnt == 0) { |
| rd_kafka_sasl_global_term(); |
| #if WITH_SSL |
| rd_kafka_transport_ssl_term(); |
| #endif |
| } |
| mtx_unlock(&rd_kafka_global_lock); |
| } |
| |
| |
| /** |
| * Wait for all rd_kafka_t objects to be destroyed. |
| * Returns 0 if all kafka objects are now destroyed, or -1 if the |
| * timeout was reached. |
| */ |
| int rd_kafka_wait_destroyed (int timeout_ms) { |
| rd_ts_t timeout = rd_clock() + (timeout_ms * 1000); |
| |
| while (rd_kafka_thread_cnt() > 0 || |
| rd_kafka_global_cnt_get() > 0) { |
| if (rd_clock() >= timeout) { |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT, |
| ETIMEDOUT); |
| #if ENABLE_SHAREDPTR_DEBUG |
| rd_shared_ptrs_dump(); |
| #endif |
| return -1; |
| } |
| rd_usleep(25000, NULL); /* 25ms */ |
| } |
| |
| return 0; |
| } |
| |
| static void rd_kafka_log_buf (const rd_kafka_conf_t *conf, |
| const rd_kafka_t *rk, int level, const char *fac, |
| const char *buf) { |
| if (level > conf->log_level) |
| return; |
| else if (rk && conf->log_queue) { |
| rd_kafka_op_t *rko; |
| |
| if (!rk->rk_logq) |
| return; /* Terminating */ |
| |
| rko = rd_kafka_op_new(RD_KAFKA_OP_LOG); |
| rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM); |
| rko->rko_u.log.level = level; |
| strncpy(rko->rko_u.log.fac, fac, |
| sizeof(rko->rko_u.log.fac) - 1); |
| rko->rko_u.log.str = rd_strdup(buf); |
| rd_kafka_q_enq(rk->rk_logq, rko); |
| |
| } else if (conf->log_cb) { |
| conf->log_cb(rk, level, fac, buf); |
| } |
| } |
| |
| /** |
| * @brief Logger |
| * |
| * @remark conf must be set, but rk may be NULL |
| */ |
| void rd_kafka_log0 (const rd_kafka_conf_t *conf, |
| const rd_kafka_t *rk, |
| const char *extra, int level, |
| const char *fac, const char *fmt, ...) { |
| char buf[2048]; |
| va_list ap; |
| unsigned int elen = 0; |
| unsigned int of = 0; |
| |
| if (level > conf->log_level) |
| return; |
| |
| if (conf->log_thread_name) { |
| elen = rd_snprintf(buf, sizeof(buf), "[thrd:%s]: ", |
| rd_kafka_thread_name); |
| if (unlikely(elen >= sizeof(buf))) |
| elen = sizeof(buf); |
| of = elen; |
| } |
| |
| if (extra) { |
| elen = rd_snprintf(buf+of, sizeof(buf)-of, "%s: ", extra); |
| if (unlikely(elen >= sizeof(buf)-of)) |
| elen = sizeof(buf)-of; |
| of += elen; |
| } |
| |
| va_start(ap, fmt); |
| rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap); |
| va_end(ap); |
| |
| rd_kafka_log_buf(conf, rk, level, fac, buf); |
| } |
| |
| |
| |
| void rd_kafka_log_print(const rd_kafka_t *rk, int level, |
| const char *fac, const char *buf) { |
| int secs, msecs; |
| struct timeval tv; |
| rd_gettimeofday(&tv, NULL); |
| secs = (int)tv.tv_sec; |
| msecs = (int)(tv.tv_usec / 1000); |
| fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n", |
| level, secs, msecs, |
| fac, rk ? rk->rk_name : "", buf); |
| } |
| |
| #ifndef _MSC_VER |
| void rd_kafka_log_syslog (const rd_kafka_t *rk, int level, |
| const char *fac, const char *buf) { |
| static int initialized = 0; |
| |
| if (!initialized) |
| openlog("rdkafka", LOG_PID|LOG_CONS, LOG_USER); |
| |
| syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf); |
| } |
| #endif |
| |
| void rd_kafka_set_logger (rd_kafka_t *rk, |
| void (*func) (const rd_kafka_t *rk, int level, |
| const char *fac, const char *buf)) { |
| rk->rk_conf.log_cb = func; |
| } |
| |
| void rd_kafka_set_log_level (rd_kafka_t *rk, int level) { |
| rk->rk_conf.log_level = level; |
| } |
| |
| |
| |
| |
| |
| |
| static const char *rd_kafka_type2str (rd_kafka_type_t type) { |
| static const char *types[] = { |
| [RD_KAFKA_PRODUCER] = "producer", |
| [RD_KAFKA_CONSUMER] = "consumer", |
| }; |
| return types[type]; |
| } |
| |
| #define _ERR_DESC(ENUM,DESC) \ |
| [ENUM - RD_KAFKA_RESP_ERR__BEGIN] = { ENUM, # ENUM + 18/*pfx*/, DESC } |
| |
| static const struct rd_kafka_err_desc rd_kafka_err_descs[] = { |
| _ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG, |
| "Local: Bad message format"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION, |
| "Local: Invalid compressed data"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY, |
| "Local: Broker handle destroyed"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__FAIL, |
| "Local: Communication failure with broker"), //FIXME: too specific |
| _ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT, |
| "Local: Broker transport failure"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE, |
| "Local: Critical system resource failure"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE, |
| "Local: Host resolution failure"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, |
| "Local: Message timed out"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF, |
| "Broker: No more messages"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
| "Local: Unknown partition"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__FS, |
| "Local: File or filesystem error"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC, |
| "Local: Unknown topic"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN, |
| "Local: All broker connections are down"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG, |
| "Local: Invalid argument or configuration"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT, |
| "Local: Timed out"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL, |
| "Local: Queue full"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF, |
| "Local: ISR count insufficient"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE, |
| "Local: Broker node update"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__SSL, |
| "Local: SSL error"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD, |
| "Local: Waiting for coordinator"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP, |
| "Local: Unknown group"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS, |
| "Local: Operation in progress"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS, |
| "Local: Previous operation in progress"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION, |
| "Local: Existing subscription"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, |
| "Local: Assign partitions"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, |
| "Local: Revoke partitions"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT, |
| "Local: Conflicting use"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__STATE, |
| "Local: Erroneous state"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL, |
| "Local: Unknown protocol"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED, |
| "Local: Not implemented"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION, |
| "Local: Authentication failure"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET, |
| "Local: No offset stored"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED, |
| "Local: Outdated"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, |
| "Local: Timed out in queue"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, |
| "Local: Required feature not supported by broker"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE, |
| "Local: Awaiting cache update"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__INTR, |
| "Local: Operation interrupted"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION, |
| "Local: Key serialization error"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION, |
| "Local: Value serialization error"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION, |
| "Local: Key deserialization error"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION, |
| "Local: Value deserialization error"), |
| |
| _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN, |
| "Unknown broker error"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR, |
| "Success"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE, |
| "Broker: Offset out of range"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG, |
| "Broker: Invalid message"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, |
| "Broker: Unknown topic or partition"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE, |
| "Broker: Invalid message size"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE, |
| "Broker: Leader not available"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, |
| "Broker: Not leader for partition"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, |
| "Broker: Request timed out"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE, |
| "Broker: Broker not available"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, |
| "Broker: Replica not available"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE, |
| "Broker: Message size too large"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH, |
| "Broker: StaleControllerEpochCode"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE, |
| "Broker: Offset metadata string too large"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION, |
| "Broker: Broker disconnected before response received"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS, |
| "Broker: Group coordinator load in progress"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE, |
| "Broker: Group coordinator not available"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP, |
| "Broker: Not coordinator for group"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION, |
| "Broker: Invalid topic"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE, |
| "Broker: Message batch larger than configured server " |
| "segment size"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, |
| "Broker: Not enough in-sync replicas"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND, |
| "Broker: Message(s) written to insufficient number of " |
| "in-sync replicas"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS, |
| "Broker: Invalid required acks value"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, |
| "Broker: Specified group generation id is not valid"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL, |
| "Broker: Inconsistent group protocol"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID, |
| "Broker: Invalid group.id"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, |
| "Broker: Unknown member"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT, |
| "Broker: Invalid session timeout"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS, |
| "Broker: Group rebalance in progress"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE, |
| "Broker: Commit offset data size is not valid"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED, |
| "Broker: Topic authorization failed"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED, |
| "Broker: Group authorization failed"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED, |
| "Broker: Cluster authorization failed"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP, |
| "Broker: Invalid timestamp"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM, |
| "Broker: Unsupported SASL mechanism"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE, |
| "Broker: Request not valid in current SASL state"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION, |
| "Broker: API version not supported"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS, |
| "Broker: Topic already exists"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS, |
| "Broker: Invalid number of partitions"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR, |
| "Broker: Invalid replication factor"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT, |
| "Broker: Invalid replica assignment"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG, |
| "Broker: Configuration is invalid"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER, |
| "Broker: Not controller for cluster"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST, |
| "Broker: Invalid request"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT, |
| "Broker: Message format on broker does not support request"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION, |
| "Broker: Isolation policy volation"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, |
| "Broker: Broker received an out of order sequence number"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER, |
| "Broker: Broker received a duplicate sequence number"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH, |
| "Broker: Producer attempted an operation with an old epoch"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE, |
| "Broker: Producer attempted a transactional operation in " |
| "an invalid state"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING, |
| "Broker: Producer attempted to use a producer id which is " |
| "not currently assigned to its transactional id"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT, |
| "Broker: Transaction timeout is larger than the maximum " |
| "value allowed by the broker's max.transaction.timeout.ms"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS, |
| "Broker: Producer attempted to update a transaction while " |
| "another concurrent operation on the same transaction was " |
| "ongoing"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED, |
| "Broker: Indicates that the transaction coordinator sending " |
| "a WriteTxnMarker is no longer the current coordinator for " |
| "a given producer"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED, |
| "Broker: Transactional Id authorization failed"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED, |
| "Broker: Security features are disabled"), |
| _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED, |
| "Broker: Operation not attempted"), |
| |
| _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL) |
| }; |
| |
| |
| void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs, |
| size_t *cntp) { |
| *errdescs = rd_kafka_err_descs; |
| *cntp = RD_ARRAYSIZE(rd_kafka_err_descs); |
| } |
| |
| |
| const char *rd_kafka_err2str (rd_kafka_resp_err_t err) { |
| static RD_TLS char ret[32]; |
| int idx = err - RD_KAFKA_RESP_ERR__BEGIN; |
| |
| if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN || |
| err >= RD_KAFKA_RESP_ERR_END_ALL || |
| !rd_kafka_err_descs[idx].desc)) { |
| rd_snprintf(ret, sizeof(ret), "Err-%i?", err); |
| return ret; |
| } |
| |
| return rd_kafka_err_descs[idx].desc; |
| } |
| |
| |
| const char *rd_kafka_err2name (rd_kafka_resp_err_t err) { |
| static RD_TLS char ret[32]; |
| int idx = err - RD_KAFKA_RESP_ERR__BEGIN; |
| |
| if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN || |
| err >= RD_KAFKA_RESP_ERR_END_ALL || |
| !rd_kafka_err_descs[idx].desc)) { |
| rd_snprintf(ret, sizeof(ret), "ERR_%i?", err); |
| return ret; |
| } |
| |
| return rd_kafka_err_descs[idx].name; |
| } |
| |
| |
| rd_kafka_resp_err_t rd_kafka_last_error (void) { |
| return rd_kafka_last_error_code; |
| } |
| |
| |
| rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) { |
| switch (errnox) |
| { |
| case EINVAL: |
| return RD_KAFKA_RESP_ERR__INVALID_ARG; |
| |
| case EBUSY: |
| return RD_KAFKA_RESP_ERR__CONFLICT; |
| |
| case ENOENT: |
| return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; |
| |
| case ESRCH: |
| return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
| |
| case ETIMEDOUT: |
| return RD_KAFKA_RESP_ERR__TIMED_OUT; |
| |
| case EMSGSIZE: |
| return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; |
| |
| case ENOBUFS: |
| return RD_KAFKA_RESP_ERR__QUEUE_FULL; |
| |
| default: |
| return RD_KAFKA_RESP_ERR__FAIL; |
| } |
| } |
| |
| |
| |
| /** |
| * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0. |
| * |
| * @locality application thread |
| */ |
| void rd_kafka_destroy_final (rd_kafka_t *rk) { |
| |
| rd_kafka_assert(rk, rd_atomic32_get(&rk->rk_terminate) != 0); |
| |
| /* Synchronize state */ |
| rd_kafka_wrlock(rk); |
| rd_kafka_wrunlock(rk); |
| |
| rd_kafka_assignors_term(rk); |
| |
| rd_kafka_metadata_cache_destroy(rk); |
| |
| rd_kafka_timers_destroy(&rk->rk_timers); |
| |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues"); |
| |
| /* Destroy cgrp */ |
| if (rk->rk_cgrp) { |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", |
| "Destroying cgrp"); |
| /* Reset queue forwarding (rep -> cgrp) */ |
| rd_kafka_q_fwd_set(rk->rk_rep, NULL); |
| rd_kafka_cgrp_destroy_final(rk->rk_cgrp); |
| } |
| |
| /* Purge op-queues */ |
| rd_kafka_q_destroy(rk->rk_rep); |
| rd_kafka_q_destroy(rk->rk_ops); |
| |
| #if WITH_SSL |
| if (rk->rk_conf.ssl.ctx) { |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX"); |
| rd_kafka_transport_ssl_ctx_term(rk); |
| } |
| #endif |
| |
| /* It is not safe to log after this point. */ |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", |
| "Termination done: freeing resources"); |
| |
| if (rk->rk_logq) { |
| rd_kafka_q_destroy(rk->rk_logq); |
| rk->rk_logq = NULL; |
| } |
| |
| if (rk->rk_type == RD_KAFKA_PRODUCER) { |
| cnd_destroy(&rk->rk_curr_msgs.cnd); |
| mtx_destroy(&rk->rk_curr_msgs.lock); |
| } |
| |
| cnd_destroy(&rk->rk_broker_state_change_cnd); |
| mtx_destroy(&rk->rk_broker_state_change_lock); |
| |
| if (rk->rk_full_metadata) |
| rd_kafka_metadata_destroy(rk->rk_full_metadata); |
| rd_kafkap_str_destroy(rk->rk_client_id); |
| rd_kafkap_str_destroy(rk->rk_group_id); |
| rd_kafkap_str_destroy(rk->rk_eos.TransactionalId); |
| rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf); |
| rd_list_destroy(&rk->rk_broker_by_id); |
| |
| rd_kafkap_bytes_destroy((rd_kafkap_bytes_t *)rk->rk_null_bytes); |
| rwlock_destroy(&rk->rk_lock); |
| |
| rd_free(rk); |
| rd_kafka_global_cnt_decr(); |
| } |
| |
| |
| static void rd_kafka_destroy_app (rd_kafka_t *rk, int blocking) { |
| thrd_t thrd; |
| #ifndef _MSC_VER |
| int term_sig = rk->rk_conf.term_sig; |
| #endif |
| rd_kafka_dbg(rk, ALL, "DESTROY", "Terminating instance"); |
| |
| /* The legacy/simple consumer lacks an API to close down the consumer*/ |
| if (rk->rk_cgrp) { |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", |
| "Closing consumer group"); |
| rd_kafka_consumer_close(rk); |
| } |
| |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers"); |
| rd_kafka_wrlock(rk); |
| thrd = rk->rk_thread; |
| rd_atomic32_add(&rk->rk_terminate, 1); |
| rd_kafka_timers_interrupt(&rk->rk_timers); |
| rd_kafka_wrunlock(rk); |
| |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", |
| "Sending TERMINATE to main background thread"); |
| /* Send op to trigger queue/io wake-up. |
| * The op itself is (likely) ignored by the receiver. */ |
| rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE)); |
| |
| rd_kafka_brokers_broadcast_state_change(rk); |
| |
| #ifndef _MSC_VER |
| /* Interrupt main kafka thread to speed up termination. */ |
| if (term_sig) { |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", |
| "Sending thread kill signal %d", term_sig); |
| pthread_kill(thrd, term_sig); |
| } |
| #endif |
| |
| if (!blocking) |
| return; /* FIXME: thread resource leak */ |
| |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", |
| "Joining main background thread"); |
| |
| if (thrd_join(thrd, NULL) != thrd_success) |
| rd_kafka_assert(NULL, !*"failed to join main thread"); |
| |
| rd_kafka_destroy_final(rk); |
| } |
| |
| |
| /* NOTE: Must only be called by application. |
| * librdkafka itself must use rd_kafka_destroy0(). */ |
| void rd_kafka_destroy (rd_kafka_t *rk) { |
| rd_kafka_destroy_app(rk, 1); |
| } |
| |
| |
| /** |
| * Main destructor for rd_kafka_t |
| * |
| * Locality: rdkafka main thread or application thread during rd_kafka_new() |
| */ |
| static void rd_kafka_destroy_internal (rd_kafka_t *rk) { |
| rd_kafka_itopic_t *rkt, *rkt_tmp; |
| rd_kafka_broker_t *rkb, *rkb_tmp; |
| rd_list_t wait_thrds; |
| thrd_t *thrd; |
| int i; |
| |
| rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal"); |
| |
| /* Call on_destroy() interceptors */ |
| rd_kafka_interceptors_on_destroy(rk); |
| |
| /* Brokers pick up on rk_terminate automatically. */ |
| |
| /* List of (broker) threads to join to synchronize termination */ |
| rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL); |
| |
| rd_kafka_wrlock(rk); |
| |
| rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics"); |
| /* Decommission all topics */ |
| TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) { |
| rd_kafka_wrunlock(rk); |
| rd_kafka_topic_partitions_remove(rkt); |
| rd_kafka_wrlock(rk); |
| } |
| |
| /* Decommission brokers. |
| * Broker thread holds a refcount and detects when broker refcounts |
| * reaches 1 and then decommissions itself. */ |
| TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) { |
| /* Add broker's thread to wait_thrds list for later joining */ |
| thrd = malloc(sizeof(*thrd)); |
| *thrd = rkb->rkb_thread; |
| rd_list_add(&wait_thrds, thrd); |
| rd_kafka_wrunlock(rk); |
| |
| /* Send op to trigger queue/io wake-up. |
| * The op itself is (likely) ignored by the broker thread. */ |
| rd_kafka_q_enq(rkb->rkb_ops, |
| rd_kafka_op_new(RD_KAFKA_OP_TERMINATE)); |
| |
| #ifndef _MSC_VER |
| /* Interrupt IO threads to speed up termination. */ |
| if (rk->rk_conf.term_sig) |
| pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig); |
| #endif |
| |
| rd_kafka_broker_destroy(rkb); |
| |
| rd_kafka_wrlock(rk); |
| } |
| |
| if (rk->rk_clusterid) { |
| rd_free(rk->rk_clusterid); |
| rk->rk_clusterid = NULL; |
| } |
| |
| rd_kafka_wrunlock(rk); |
| |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", |
| "Purging reply queue"); |
| |
| /* Purge op-queue */ |
| rd_kafka_q_disable(rk->rk_rep); |
| rd_kafka_q_purge(rk->rk_rep); |
| |
| /* Loose our special reference to the internal broker. */ |
| mtx_lock(&rk->rk_internal_rkb_lock); |
| if ((rkb = rk->rk_internal_rkb)) { |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", |
| "Decommissioning internal broker"); |
| |
| /* Send op to trigger queue wake-up. */ |
| rd_kafka_q_enq(rkb->rkb_ops, |
| rd_kafka_op_new(RD_KAFKA_OP_TERMINATE)); |
| |
| rk->rk_internal_rkb = NULL; |
| thrd = malloc(sizeof(*thrd)); |
| *thrd = rkb->rkb_thread; |
| rd_list_add(&wait_thrds, thrd); |
| } |
| mtx_unlock(&rk->rk_internal_rkb_lock); |
| if (rkb) |
| rd_kafka_broker_destroy(rkb); |
| |
| |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", |
| "Join %d broker thread(s)", rd_list_cnt(&wait_thrds)); |
| |
| /* Join broker threads */ |
| RD_LIST_FOREACH(thrd, &wait_thrds, i) { |
| if (thrd_join(*thrd, NULL) != thrd_success) |
| ; |
| free(thrd); |
| } |
| |
| rd_list_destroy(&wait_thrds); |
| } |
| |
| |
| /* Stats buffer printf */ |
| #define _st_printf(...) do { \ |
| ssize_t r; \ |
| ssize_t rem = size-of; \ |
| r = rd_snprintf(buf+of, rem, __VA_ARGS__); \ |
| if (r >= rem) { \ |
| size *= 2; \ |
| rem = size-of; \ |
| buf = rd_realloc(buf, size); \ |
| r = rd_snprintf(buf+of, rem, __VA_ARGS__); \ |
| } \ |
| of += r; \ |
| } while (0) |
| |
| /** |
| * Emit stats for toppar |
| */ |
| static RD_INLINE void rd_kafka_stats_emit_toppar (char **bufp, size_t *sizep, |
| size_t *ofp, |
| rd_kafka_toppar_t *rktp, |
| int first) { |
| char *buf = *bufp; |
| size_t size = *sizep; |
| size_t of = *ofp; |
| int64_t consumer_lag = -1; |
| struct offset_stats offs; |
| int32_t leader_nodeid = -1; |
| |
| rd_kafka_toppar_lock(rktp); |
| |
| if (rktp->rktp_leader) { |
| rd_kafka_broker_lock(rktp->rktp_leader); |
| leader_nodeid = rktp->rktp_leader->rkb_nodeid; |
| rd_kafka_broker_unlock(rktp->rktp_leader); |
| } |
| |
| /* Grab a copy of the latest finalized offset stats */ |
| offs = rktp->rktp_offsets_fin; |
| |
| if (rktp->rktp_hi_offset != RD_KAFKA_OFFSET_INVALID && |
| rktp->rktp_app_offset >= 0) { |
| if (unlikely(rktp->rktp_app_offset > rktp->rktp_hi_offset)) |
| consumer_lag = 0; |
| else |
| consumer_lag = rktp->rktp_hi_offset - |
| rktp->rktp_app_offset; |
| } |
| |
| _st_printf("%s\"%"PRId32"\": { " |
| "\"partition\":%"PRId32", " |
| "\"leader\":%"PRId32", " |
| "\"desired\":%s, " |
| "\"unknown\":%s, " |
| "\"msgq_cnt\":%i, " |
| "\"msgq_bytes\":%"PRIu64", " |
| "\"xmit_msgq_cnt\":%i, " |
| "\"xmit_msgq_bytes\":%"PRIu64", " |
| "\"fetchq_cnt\":%i, " |
| "\"fetchq_size\":%"PRIu64", " |
| "\"fetch_state\":\"%s\", " |
| "\"query_offset\":%"PRId64", " |
| "\"next_offset\":%"PRId64", " |
| "\"app_offset\":%"PRId64", " |
| "\"stored_offset\":%"PRId64", " |
| "\"commited_offset\":%"PRId64", " /*FIXME: issue #80 */ |
| "\"committed_offset\":%"PRId64", " |
| "\"eof_offset\":%"PRId64", " |
| "\"lo_offset\":%"PRId64", " |
| "\"hi_offset\":%"PRId64", " |
| "\"consumer_lag\":%"PRId64", " |
| "\"txmsgs\":%"PRIu64", " |
| "\"txbytes\":%"PRIu64", " |
| "\"msgs\": %"PRIu64", " |
| "\"rx_ver_drops\": %"PRIu64" " |
| "} ", |
| first ? "" : ", ", |
| rktp->rktp_partition, |
| rktp->rktp_partition, |
| leader_nodeid, |
| (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false", |
| (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false", |
| rd_atomic32_get(&rktp->rktp_msgq.rkmq_msg_cnt), |
| rd_atomic64_get(&rktp->rktp_msgq.rkmq_msg_bytes), |
| rd_atomic32_get(&rktp->rktp_xmit_msgq.rkmq_msg_cnt), |
| rd_atomic64_get(&rktp->rktp_xmit_msgq.rkmq_msg_bytes), |
| rd_kafka_q_len(rktp->rktp_fetchq), |
| rd_kafka_q_size(rktp->rktp_fetchq), |
| rd_kafka_fetch_states[rktp->rktp_fetch_state], |
| rktp->rktp_query_offset, |
| offs.fetch_offset, |
| rktp->rktp_app_offset, |
| rktp->rktp_stored_offset, |
| rktp->rktp_committed_offset, /* FIXME: issue #80 */ |
| rktp->rktp_committed_offset, |
| offs.eof_offset, |
| rktp->rktp_lo_offset, |
| rktp->rktp_hi_offset, |
| consumer_lag, |
| rd_atomic64_get(&rktp->rktp_c.tx_msgs), |
| rd_atomic64_get(&rktp->rktp_c.tx_bytes), |
| rd_atomic64_get(&rktp->rktp_c.msgs), |
| rd_atomic64_get(&rktp->rktp_c.rx_ver_drops)); |
| |
| rd_kafka_toppar_unlock(rktp); |
| |
| *bufp = buf; |
| *sizep = size; |
| *ofp = of; |
| } |
| |
| /** |
| * Emit all statistics |
| */ |
| static void rd_kafka_stats_emit_all (rd_kafka_t *rk) { |
| char *buf; |
| size_t size = 1024*10; |
| size_t of = 0; |
| rd_kafka_broker_t *rkb; |
| rd_kafka_itopic_t *rkt; |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_ts_t now; |
| rd_kafka_op_t *rko; |
| unsigned int tot_cnt; |
| size_t tot_size; |
| |
| buf = rd_malloc(size); |
| |
| |
| rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size); |
| rd_kafka_rdlock(rk); |
| |
| now = rd_clock(); |
| _st_printf("{ " |
| "\"name\": \"%s\", " |
| "\"type\": \"%s\", " |
| "\"ts\":%"PRId64", " |
| "\"time\":%lli, " |
| "\"replyq\":%i, " |
| "\"msg_cnt\":%u, " |
| "\"msg_size\":%"PRIusz", " |
| "\"msg_max\":%u, " |
| "\"msg_size_max\":%"PRIusz", " |
| "\"simple_cnt\":%i, " |
| "\"metadata_cache_cnt\":%i, " |
| "\"brokers\":{ "/*open brokers*/, |
| rk->rk_name, |
| rd_kafka_type2str(rk->rk_type), |
| now, |
| (signed long long)time(NULL), |
| rd_kafka_q_len(rk->rk_rep), |
| tot_cnt, tot_size, |
| rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size, |
| rd_atomic32_get(&rk->rk_simple_cnt), |
| rk->rk_metadata_cache.rkmc_cnt); |
| |
| |
| TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
| rd_avg_t rtt, throttle, int_latency; |
| rd_kafka_toppar_t *rktp; |
| |
| rd_kafka_broker_lock(rkb); |
| rd_avg_rollover(&int_latency, &rkb->rkb_avg_int_latency); |
| rd_avg_rollover(&rtt, &rkb->rkb_avg_rtt); |
| rd_avg_rollover(&throttle, &rkb->rkb_avg_throttle); |
| _st_printf("%s\"%s\": { "/*open broker*/ |
| "\"name\":\"%s\", " |
| "\"nodeid\":%"PRId32", " |
| "\"state\":\"%s\", " |
| "\"stateage\":%"PRId64", " |
| "\"outbuf_cnt\":%i, " |
| "\"outbuf_msg_cnt\":%i, " |
| "\"waitresp_cnt\":%i, " |
| "\"waitresp_msg_cnt\":%i, " |
| "\"tx\":%"PRIu64", " |
| "\"txbytes\":%"PRIu64", " |
| "\"txerrs\":%"PRIu64", " |
| "\"txretries\":%"PRIu64", " |
| "\"req_timeouts\":%"PRIu64", " |
| "\"rx\":%"PRIu64", " |
| "\"rxbytes\":%"PRIu64", " |
| "\"rxerrs\":%"PRIu64", " |
| "\"rxcorriderrs\":%"PRIu64", " |
| "\"rxpartial\":%"PRIu64", " |
| "\"zbuf_grow\":%"PRIu64", " |
| "\"buf_grow\":%"PRIu64", " |
| "\"wakeups\":%"PRIu64", " |
| "\"int_latency\": {" |
| " \"min\":%"PRId64"," |
| " \"max\":%"PRId64"," |
| " \"avg\":%"PRId64"," |
| " \"sum\":%"PRId64"," |
| " \"cnt\":%i " |
| "}, " |
| "\"rtt\": {" |
| " \"min\":%"PRId64"," |
| " \"max\":%"PRId64"," |
| " \"avg\":%"PRId64"," |
| " \"sum\":%"PRId64"," |
| " \"cnt\":%i " |
| "}, " |
| "\"throttle\": {" |
| " \"min\":%"PRId64"," |
| " \"max\":%"PRId64"," |
| " \"avg\":%"PRId64"," |
| " \"sum\":%"PRId64"," |
| " \"cnt\":%i " |
| "}, " |
| "\"toppars\":{ "/*open toppars*/, |
| rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ", |
| rkb->rkb_name, |
| rkb->rkb_name, |
| rkb->rkb_nodeid, |
| rd_kafka_broker_state_names[rkb->rkb_state], |
| rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0, |
| rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt), |
| rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt), |
| rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt), |
| rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt), |
| rd_atomic64_get(&rkb->rkb_c.tx), |
| rd_atomic64_get(&rkb->rkb_c.tx_bytes), |
| rd_atomic64_get(&rkb->rkb_c.tx_err), |
| rd_atomic64_get(&rkb->rkb_c.tx_retries), |
| rd_atomic64_get(&rkb->rkb_c.req_timeouts), |
| rd_atomic64_get(&rkb->rkb_c.rx), |
| rd_atomic64_get(&rkb->rkb_c.rx_bytes), |
| rd_atomic64_get(&rkb->rkb_c.rx_err), |
| rd_atomic64_get(&rkb->rkb_c.rx_corrid_err), |
| rd_atomic64_get(&rkb->rkb_c.rx_partial), |
| rd_atomic64_get(&rkb->rkb_c.zbuf_grow), |
| rd_atomic64_get(&rkb->rkb_c.buf_grow), |
| rd_atomic64_get(&rkb->rkb_c.wakeups), |
| int_latency.ra_v.minv, |
| int_latency.ra_v.maxv, |
| int_latency.ra_v.avg, |
| int_latency.ra_v.sum, |
| int_latency.ra_v.cnt, |
| rtt.ra_v.minv, |
| rtt.ra_v.maxv, |
| rtt.ra_v.avg, |
| rtt.ra_v.sum, |
| rtt.ra_v.cnt, |
| throttle.ra_v.minv, |
| throttle.ra_v.maxv, |
| throttle.ra_v.avg, |
| throttle.ra_v.sum, |
| throttle.ra_v.cnt); |
| |
| TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) { |
| _st_printf("%s\"%.*s-%"PRId32"\": { " |
| "\"topic\":\"%.*s\", " |
| "\"partition\":%"PRId32"} ", |
| rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ", |
| RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
| rktp->rktp_partition, |
| RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
| rktp->rktp_partition); |
| } |
| |
| rd_kafka_broker_unlock(rkb); |
| |
| _st_printf("} "/*close toppars*/ |
| "} "/*close broker*/); |
| } |
| |
| |
| _st_printf("}, " /* close "brokers" array */ |
| "\"topics\":{ "); |
| |
| TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
| int i, j; |
| |
| rd_kafka_topic_rdlock(rkt); |
| _st_printf("%s\"%.*s\": { " |
| "\"topic\":\"%.*s\", " |
| "\"metadata_age\":%"PRId64", " |
| "\"partitions\":{ " /*open partitions*/, |
| rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ", |
| RD_KAFKAP_STR_PR(rkt->rkt_topic), |
| RD_KAFKAP_STR_PR(rkt->rkt_topic), |
| rkt->rkt_ts_metadata ? |
| (rd_clock() - rkt->rkt_ts_metadata)/1000 : 0); |
| |
| for (i = 0 ; i < rkt->rkt_partition_cnt ; i++) |
| rd_kafka_stats_emit_toppar(&buf, &size, &of, |
| rd_kafka_toppar_s2i(rkt->rkt_p[i]), |
| i == 0); |
| |
| RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, j) |
| rd_kafka_stats_emit_toppar(&buf, &size, &of, |
| rd_kafka_toppar_s2i(s_rktp), |
| i+j == 0); |
| |
| i += j; |
| |
| if (rkt->rkt_ua) |
| rd_kafka_stats_emit_toppar(&buf, &size, &of, |
| rd_kafka_toppar_s2i(rkt->rkt_ua), |
| i++ == 0); |
| rd_kafka_topic_rdunlock(rkt); |
| |
| _st_printf("} "/*close partitions*/ |
| "} "/*close topic*/); |
| |
| } |
| _st_printf("} "/*close topics*/); |
| |
| if (rk->rk_cgrp) { |
| rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; |
| _st_printf(", \"cgrp\": { " |
| "\"rebalance_age\": %"PRId64", " |
| "\"rebalance_cnt\": %d, " |
| "\"assignment_size\": %d }", |
| rkcg->rkcg_c.ts_rebalance ? |
| (rd_clock() - rkcg->rkcg_c.ts_rebalance)/1000 : 0, |
| rkcg->rkcg_c.rebalance_cnt, |
| rkcg->rkcg_c.assignment_size); |
| } |
| rd_kafka_rdunlock(rk); |
| |
| _st_printf("}"/*close object*/); |
| |
| |
| /* Enqueue op for application */ |
| rko = rd_kafka_op_new(RD_KAFKA_OP_STATS); |
| rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH); |
| rko->rko_u.stats.json = buf; |
| rko->rko_u.stats.json_len = of; |
| rd_kafka_q_enq(rk->rk_rep, rko); |
| } |
| |
| |
| |
| static void rd_kafka_topic_scan_tmr_cb (rd_kafka_timers_t *rkts, void *arg) { |
| rd_kafka_t *rk = rkts->rkts_rk; |
| rd_kafka_topic_scan_all(rk, rd_clock()); |
| } |
| |
| static void rd_kafka_stats_emit_tmr_cb (rd_kafka_timers_t *rkts, void *arg) { |
| rd_kafka_t *rk = rkts->rkts_rk; |
| rd_kafka_stats_emit_all(rk); |
| } |
| |
| |
| /** |
| * @brief Periodic metadata refresh callback |
| * |
| * @locality rdkafka main thread |
| */ |
| static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) { |
| rd_kafka_t *rk = rkts->rkts_rk; |
| int sparse = 1; |
| |
| /* Dont do sparse requests if there is a consumer group with an |
| * active subscription since subscriptions need to be able to match |
| * on all topics. */ |
| if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp && |
| rk->rk_cgrp->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) |
| sparse = 0; |
| |
| if (sparse) |
| rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/, |
| "periodic refresh"); |
| else |
| rd_kafka_metadata_refresh_all(rk, NULL, "periodic refresh"); |
| } |
| |
| |
| /** |
| * Main loop for Kafka handler thread. |
| */ |
| static int rd_kafka_thread_main (void *arg) { |
| rd_kafka_t *rk = arg; |
| rd_kafka_timer_t tmr_topic_scan = RD_ZERO_INIT; |
| rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT; |
| rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT; |
| |
| rd_snprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name), "main"); |
| |
| (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1); |
| |
| /* Acquire lock (which was held by thread creator during creation) |
| * to synchronise state. */ |
| rd_kafka_wrlock(rk); |
| rd_kafka_wrunlock(rk); |
| |
| rd_kafka_timer_start(&rk->rk_timers, &tmr_topic_scan, 1000000, |
| rd_kafka_topic_scan_tmr_cb, NULL); |
| rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit, |
| rk->rk_conf.stats_interval_ms * 1000ll, |
| rd_kafka_stats_emit_tmr_cb, NULL); |
| if (rk->rk_conf.metadata_refresh_interval_ms > 0) |
| rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh, |
| rk->rk_conf.metadata_refresh_interval_ms * |
| 1000ll, |
| rd_kafka_metadata_refresh_cb, NULL); |
| |
| if (rk->rk_cgrp) { |
| rd_kafka_cgrp_reassign_broker(rk->rk_cgrp); |
| rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops); |
| } |
| |
| while (likely(!rd_kafka_terminating(rk) || |
| rd_kafka_q_len(rk->rk_ops))) { |
| rd_ts_t sleeptime = rd_kafka_timers_next( |
| &rk->rk_timers, 1000*1000/*1s*/, 1/*lock*/); |
| rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0, |
| RD_KAFKA_Q_CB_CALLBACK, NULL, NULL); |
| if (rk->rk_cgrp) /* FIXME: move to timer-triggered */ |
| rd_kafka_cgrp_serve(rk->rk_cgrp); |
| rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT); |
| } |
| |
| rd_kafka_q_disable(rk->rk_ops); |
| rd_kafka_q_purge(rk->rk_ops); |
| |
| rd_kafka_timer_stop(&rk->rk_timers, &tmr_topic_scan, 1); |
| rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1); |
| rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1); |
| |
| /* Synchronise state */ |
| rd_kafka_wrlock(rk); |
| rd_kafka_wrunlock(rk); |
| |
| rd_kafka_destroy_internal(rk); |
| |
| rd_kafka_dbg(rk, GENERIC, "TERMINATE", |
| "Main background thread exiting"); |
| |
| rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1); |
| |
| return 0; |
| } |
| |
| |
| static void rd_kafka_term_sig_handler (int sig) { |
| /* nop */ |
| } |
| |
| |
| rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf, |
| char *errstr, size_t errstr_size) { |
| rd_kafka_t *rk; |
| static rd_atomic32_t rkid; |
| rd_kafka_conf_t *conf; |
| rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR; |
| int ret_errno = 0; |
| #ifndef _MSC_VER |
| sigset_t newset, oldset; |
| #endif |
| |
| call_once(&rd_kafka_global_init_once, rd_kafka_global_init); |
| |
| /* rd_kafka_new() takes ownership of the provided \p app_conf |
| * object if rd_kafka_new() succeeds. |
| * Since \p app_conf is optional we allocate a default configuration |
| * object here if \p app_conf is NULL. |
| * The configuration object itself is struct-copied later |
| * leaving the default *conf pointer to be ready for freeing. |
| * In case new() fails and app_conf was specified we will clear out |
| * rk_conf to avoid double-freeing from destroy_internal() and the |
| * user's eventual call to rd_kafka_conf_destroy(). |
| * This is all a bit tricky but that's the nature of |
| * legacy interfaces. */ |
| if (!app_conf) |
| conf = rd_kafka_conf_new(); |
| else |
| conf = app_conf; |
| |
| /* Verify mandatory configuration */ |
| if (!conf->socket_cb) { |
| rd_snprintf(errstr, errstr_size, |
| "Mandatory config property 'socket_cb' not set"); |
| if (!app_conf) |
| rd_kafka_conf_destroy(conf); |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); |
| return NULL; |
| } |
| |
| if (!conf->open_cb) { |
| rd_snprintf(errstr, errstr_size, |
| "Mandatory config property 'open_cb' not set"); |
| if (!app_conf) |
| rd_kafka_conf_destroy(conf); |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); |
| return NULL; |
| } |
| |
| if (conf->metadata_max_age_ms == -1) { |
| if (conf->metadata_refresh_interval_ms > 0) |
| conf->metadata_max_age_ms = |
| conf->metadata_refresh_interval_ms * 3; |
| else /* use default value of refresh * 3 */ |
| conf->metadata_max_age_ms = 5*60*1000 * 3; |
| } |
| |
| rd_kafka_global_cnt_incr(); |
| |
| /* |
| * Set up the handle. |
| */ |
| rk = rd_calloc(1, sizeof(*rk)); |
| |
| rk->rk_type = type; |
| |
| /* Struct-copy the config object. */ |
| rk->rk_conf = *conf; |
| if (!app_conf) |
| rd_free(conf); /* Free the base config struct only, |
| * not its fields since they were copied to |
| * rk_conf just above. Those fields are |
| * freed from rd_kafka_destroy_internal() |
| * as the rk itself is destroyed. */ |
| |
| /* Call on_new() interceptors */ |
| rd_kafka_interceptors_on_new(rk, &rk->rk_conf); |
| |
| rwlock_init(&rk->rk_lock); |
| mtx_init(&rk->rk_internal_rkb_lock, mtx_plain); |
| |
| cnd_init(&rk->rk_broker_state_change_cnd); |
| mtx_init(&rk->rk_broker_state_change_lock, mtx_plain); |
| |
| rk->rk_rep = rd_kafka_q_new(rk); |
| rk->rk_ops = rd_kafka_q_new(rk); |
| rk->rk_ops->rkq_serve = rd_kafka_poll_cb; |
| rk->rk_ops->rkq_opaque = rk; |
| |
| if (rk->rk_conf.log_queue) { |
| rk->rk_logq = rd_kafka_q_new(rk); |
| rk->rk_logq->rkq_serve = rd_kafka_poll_cb; |
| rk->rk_logq->rkq_opaque = rk; |
| } |
| |
| TAILQ_INIT(&rk->rk_brokers); |
| TAILQ_INIT(&rk->rk_topics); |
| rd_kafka_timers_init(&rk->rk_timers, rk); |
| rd_kafka_metadata_cache_init(rk); |
| |
| if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb) |
| rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR; |
| if (rk->rk_conf.rebalance_cb) |
| rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE; |
| if (rk->rk_conf.offset_commit_cb) |
| rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT; |
| |
| /* Convenience Kafka protocol null bytes */ |
| rk->rk_null_bytes = rd_kafkap_bytes_new(NULL, 0); |
| |
| if (rk->rk_conf.debug) |
| rk->rk_conf.log_level = LOG_DEBUG; |
| |
| rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i", |
| rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type), |
| rd_atomic32_add(&rkid, 1)); |
| |
| /* Construct clientid kafka string */ |
| rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str,-1); |
| |
| /* Convert group.id to kafka string (may be NULL) */ |
| rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str,-1); |
| |
| /* Config fixups */ |
| rk->rk_conf.queued_max_msg_bytes = |
| (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll; |
| |
| /* Enable api.version.request=true if fallback.broker.version |
| * indicates a supporting broker. */ |
| if (rd_kafka_ApiVersion_is_queryable(rk->rk_conf.broker_version_fallback)) |
| rk->rk_conf.api_version_request = 1; |
| |
| if (rk->rk_type == RD_KAFKA_PRODUCER) { |
| mtx_init(&rk->rk_curr_msgs.lock, mtx_plain); |
| cnd_init(&rk->rk_curr_msgs.cnd); |
| rk->rk_curr_msgs.max_cnt = |
| rk->rk_conf.queue_buffering_max_msgs; |
| if ((unsigned long long)rk->rk_conf.queue_buffering_max_kbytes * 1024 > |
| (unsigned long long)SIZE_MAX) |
| rk->rk_curr_msgs.max_size = SIZE_MAX; |
| else |
| rk->rk_curr_msgs.max_size = |
| (size_t)rk->rk_conf.queue_buffering_max_kbytes * 1024; |
| } |
| |
| if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) { |
| ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG; |
| ret_errno = EINVAL; |
| goto fail; |
| } |
| |
| if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL || |
| rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) { |
| if (rd_kafka_sasl_select_provider(rk, |
| errstr, errstr_size) == -1) { |
| ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG; |
| ret_errno = EINVAL; |
| goto fail; |
| } |
| } |
| |
| #if WITH_SSL |
| if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL || |
| rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) { |
| /* Create SSL context */ |
| if (rd_kafka_transport_ssl_ctx_init(rk, errstr, |
| errstr_size) == -1) { |
| ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG; |
| ret_errno = EINVAL; |
| goto fail; |
| } |
| } |
| #endif |
| |
| /* Client group, eligible both in consumer and producer mode. */ |
| if (type == RD_KAFKA_CONSUMER && |
| RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0) |
| rk->rk_cgrp = rd_kafka_cgrp_new(rk, |
| rk->rk_group_id, |
| rk->rk_client_id); |
| |
| |
| |
| #ifndef _MSC_VER |
| /* Block all signals in newly created thread. |
| * To avoid race condition we block all signals in the calling |
| * thread, which the new thread will inherit its sigmask from, |
| * and then restore the original sigmask of the calling thread when |
| * we're done creating the thread. */ |
| sigemptyset(&oldset); |
| sigfillset(&newset); |
| if (rk->rk_conf.term_sig) { |
| struct sigaction sa_term = { |
| .sa_handler = rd_kafka_term_sig_handler |
| }; |
| sigaction(rk->rk_conf.term_sig, &sa_term, NULL); |
| } |
| pthread_sigmask(SIG_SETMASK, &newset, &oldset); |
| #endif |
| |
| /* Lock handle here to synchronise state, i.e., hold off |
| * the thread until we've finalized the handle. */ |
| rd_kafka_wrlock(rk); |
| |
| /* Create handler thread */ |
| if ((thrd_create(&rk->rk_thread, |
| rd_kafka_thread_main, rk)) != thrd_success) { |
| ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; |
| ret_errno = errno; |
| if (errstr) |
| rd_snprintf(errstr, errstr_size, |
| "Failed to create thread: %s (%i)", |
| rd_strerror(errno), errno); |
| rd_kafka_wrunlock(rk); |
| #ifndef _MSC_VER |
| /* Restore sigmask of caller */ |
| pthread_sigmask(SIG_SETMASK, &oldset, NULL); |
| #endif |
| goto fail; |
| } |
| |
| rd_kafka_wrunlock(rk); |
| |
| rk->rk_eos.PID = -1; |
| rk->rk_eos.TransactionalId = rd_kafkap_str_new(NULL, 0); |
| |
| mtx_lock(&rk->rk_internal_rkb_lock); |
| rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL, |
| RD_KAFKA_PROTO_PLAINTEXT, |
| "", 0, RD_KAFKA_NODEID_UA); |
| mtx_unlock(&rk->rk_internal_rkb_lock); |
| |
| /* Add initial list of brokers from configuration */ |
| if (rk->rk_conf.brokerlist) { |
| if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0) |
| rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN, |
| "No brokers configured"); |
| } |
| |
| #ifndef _MSC_VER |
| /* Restore sigmask of caller */ |
| pthread_sigmask(SIG_SETMASK, &oldset, NULL); |
| #endif |
| |
| /* Free user supplied conf's base pointer on success, |
| * but not the actual allocated fields since the struct |
| * will have been copied in its entirety above. */ |
| if (app_conf) |
| rd_free(app_conf); |
| rd_kafka_set_last_error(0, 0); |
| |
| rk->rk_initialized = 1; |
| |
| return rk; |
| |
| fail: |
| /* |
| * Error out and clean up |
| */ |
| |
| /* If on_new() interceptors have been called we also need |
| * to allow interceptor clean-up by calling on_destroy() */ |
| rd_kafka_interceptors_on_destroy(rk); |
| |
| /* If rk_conf is a struct-copy of the application configuration |
| * we need to avoid rk_conf fields from being freed from |
| * rd_kafka_destroy_internal() since they belong to app_conf. |
| * However, there are some internal fields, such as interceptors, |
| * that belong to rk_conf and thus needs to be cleaned up. |
| * Legacy APIs, sigh.. */ |
| if (app_conf) { |
| rd_kafka_assignors_term(rk); |
| rd_kafka_interceptors_destroy(&rk->rk_conf); |
| memset(&rk->rk_conf, 0, sizeof(rk->rk_conf)); |
| } |
| |
| rd_atomic32_add(&rk->rk_terminate, 1); |
| rd_kafka_destroy_internal(rk); |
| rd_kafka_destroy_final(rk); |
| |
| rd_kafka_set_last_error(ret_err, ret_errno); |
| |
| return NULL; |
| } |
| |
| |
| |
| |
| |
| /** |
| * Produce a single message. |
| * Locality: any application thread |
| */ |
| int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition, |
| int msgflags, |
| void *payload, size_t len, |
| const void *key, size_t keylen, |
| void *msg_opaque) { |
| return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition, |
| msgflags, payload, len, |
| key, keylen, msg_opaque); |
| } |
| |
| |
| /** |
| * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with |
| * friends) since it does not have an API for stopping the cgrp we will need to |
| * sort that out automatically in the background when all consumption |
| * has stopped. |
| * |
| * Returns 0 if a High level consumer is already instantiated |
| * which means a Simple consumer cannot co-operate with it, else 1. |
| * |
| * A rd_kafka_t handle can never migrate from simple to high-level, or |
| * vice versa, so we dont need a ..consumer_del(). |
| */ |
| int rd_kafka_simple_consumer_add (rd_kafka_t *rk) { |
| if (rd_atomic32_get(&rk->rk_simple_cnt) < 0) |
| return 0; |
| |
| return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1); |
| } |
| |
| |
| |
| |
| /** |
| * rktp fetch is split up in these parts: |
| * * application side: |
| * * broker side (handled by current leader broker thread for rktp): |
| * - the fetch state, initial offset, etc. |
| * - fetching messages, updating fetched offset, etc. |
| * - offset commits |
| * |
| * Communication between the two are: |
| * app side -> rdkafka main side: rktp_ops |
| * broker thread -> app side: rktp_fetchq |
| * |
| * There is no shared state between these threads, instead |
| * state is communicated through the two op queues, and state synchronization |
| * is performed by version barriers. |
| * |
| */ |
| |
| static RD_UNUSED |
| int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition, |
| int64_t offset, rd_kafka_q_t *rkq) { |
| shptr_rd_kafka_toppar_t *s_rktp; |
| |
| if (partition < 0) { |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
| ESRCH); |
| return -1; |
| } |
| |
| if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) { |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); |
| return -1; |
| } |
| |
| rd_kafka_topic_wrlock(rkt); |
| s_rktp = rd_kafka_toppar_desired_add(rkt, partition); |
| rd_kafka_topic_wrunlock(rkt); |
| |
| /* Verify offset */ |
| if (offset == RD_KAFKA_OFFSET_BEGINNING || |
| offset == RD_KAFKA_OFFSET_END || |
| offset <= RD_KAFKA_OFFSET_TAIL_BASE) { |
| /* logical offsets */ |
| |
| } else if (offset == RD_KAFKA_OFFSET_STORED) { |
| /* offset manager */ |
| |
| if (rkt->rkt_conf.offset_store_method == |
| RD_KAFKA_OFFSET_METHOD_BROKER && |
| RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) { |
| /* Broker based offsets require a group id. */ |
| rd_kafka_toppar_destroy(s_rktp); |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, |
| EINVAL); |
| return -1; |
| } |
| |
| } else if (offset < 0) { |
| rd_kafka_toppar_destroy(s_rktp); |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, |
| EINVAL); |
| return -1; |
| |
| } |
| |
| rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset, |
| rkq, RD_KAFKA_NO_REPLYQ); |
| |
| rd_kafka_toppar_destroy(s_rktp); |
| |
| rd_kafka_set_last_error(0, 0); |
| return 0; |
| } |
| |
| |
| |
| |
| int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition, |
| int64_t offset) { |
| rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
| rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START", |
| "Start consuming partition %"PRId32,partition); |
| return rd_kafka_consume_start0(rkt, partition, offset, NULL); |
| } |
| |
| int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition, |
| int64_t offset, rd_kafka_queue_t *rkqu) { |
| rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
| |
| return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q); |
| } |
| |
| |
| |
| |
| static RD_UNUSED int rd_kafka_consume_stop0 (rd_kafka_toppar_t *rktp) { |
| rd_kafka_q_t *tmpq = NULL; |
| rd_kafka_resp_err_t err; |
| |
| rd_kafka_topic_wrlock(rktp->rktp_rkt); |
| rd_kafka_toppar_lock(rktp); |
| rd_kafka_toppar_desired_del(rktp); |
| rd_kafka_toppar_unlock(rktp); |
| rd_kafka_topic_wrunlock(rktp->rktp_rkt); |
| |
| tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk); |
| |
| rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0)); |
| |
| /* Synchronisation: Wait for stop reply from broker thread */ |
| err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE); |
| rd_kafka_q_destroy(tmpq); |
| |
| rd_kafka_set_last_error(err, err ? EINVAL : 0); |
| |
| return err ? -1 : 0; |
| } |
| |
| |
| int rd_kafka_consume_stop (rd_kafka_topic_t *app_rkt, int32_t partition) { |
| rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
| shptr_rd_kafka_toppar_t *s_rktp; |
| int r; |
| |
| if (partition == RD_KAFKA_PARTITION_UA) { |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); |
| return -1; |
| } |
| |
| rd_kafka_topic_wrlock(rkt); |
| if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) && |
| !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) { |
| rd_kafka_topic_wrunlock(rkt); |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
| ESRCH); |
| return -1; |
| } |
| rd_kafka_topic_wrunlock(rkt); |
| |
| r = rd_kafka_consume_stop0(rd_kafka_toppar_s2i(s_rktp)); |
| /* set_last_error() called by stop0() */ |
| |
| rd_kafka_toppar_destroy(s_rktp); |
| |
| return r; |
| } |
| |
| |
| |
| rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *app_rkt, |
| int32_t partition, |
| int64_t offset, |
| int timeout_ms) { |
| rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_kafka_toppar_t *rktp; |
| rd_kafka_q_t *tmpq = NULL; |
| rd_kafka_resp_err_t err; |
| |
| /* FIXME: simple consumer check */ |
| |
| if (partition == RD_KAFKA_PARTITION_UA) |
| return RD_KAFKA_RESP_ERR__INVALID_ARG; |
| |
| rd_kafka_topic_rdlock(rkt); |
| if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) && |
| !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) { |
| rd_kafka_topic_rdunlock(rkt); |
| return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
| } |
| rd_kafka_topic_rdunlock(rkt); |
| |
| if (timeout_ms) |
| tmpq = rd_kafka_q_new(rkt->rkt_rk); |
| |
| rktp = rd_kafka_toppar_s2i(s_rktp); |
| if ((err = rd_kafka_toppar_op_seek(rktp, offset, |
| RD_KAFKA_REPLYQ(tmpq, 0)))) { |
| if (tmpq) |
| rd_kafka_q_destroy(tmpq); |
| rd_kafka_toppar_destroy(s_rktp); |
| return err; |
| } |
| |
| rd_kafka_toppar_destroy(s_rktp); |
| |
| if (tmpq) { |
| err = rd_kafka_q_wait_result(tmpq, timeout_ms); |
| rd_kafka_q_destroy(tmpq); |
| return err; |
| } |
| |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| |
| |
| static ssize_t rd_kafka_consume_batch0 (rd_kafka_q_t *rkq, |
| int timeout_ms, |
| rd_kafka_message_t **rkmessages, |
| size_t rkmessages_size) { |
| /* Populate application's rkmessages array. */ |
| return rd_kafka_q_serve_rkmessages(rkq, timeout_ms, |
| rkmessages, rkmessages_size); |
| } |
| |
| |
| ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *app_rkt, int32_t partition, |
| int timeout_ms, |
| rd_kafka_message_t **rkmessages, |
| size_t rkmessages_size) { |
| rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_kafka_toppar_t *rktp; |
| ssize_t cnt; |
| |
| /* Get toppar */ |
| rd_kafka_topic_rdlock(rkt); |
| s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/); |
| if (unlikely(!s_rktp)) |
| s_rktp = rd_kafka_toppar_desired_get(rkt, partition); |
| rd_kafka_topic_rdunlock(rkt); |
| |
| if (unlikely(!s_rktp)) { |
| /* No such toppar known */ |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
| ESRCH); |
| return -1; |
| } |
| |
| rktp = rd_kafka_toppar_s2i(s_rktp); |
| |
| /* Populate application's rkmessages array. */ |
| cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms, |
| rkmessages, rkmessages_size); |
| |
| rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */ |
| |
| rd_kafka_set_last_error(0, 0); |
| |
| return cnt; |
| } |
| |
| ssize_t rd_kafka_consume_batch_queue (rd_kafka_queue_t *rkqu, |
| int timeout_ms, |
| rd_kafka_message_t **rkmessages, |
| size_t rkmessages_size) { |
| /* Populate application's rkmessages array. */ |
| return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms, |
| rkmessages, rkmessages_size); |
| } |
| |
| |
| struct consume_ctx { |
| void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque); |
| void *opaque; |
| }; |
| |
| |
| /** |
| * Trampoline for application's consume_cb() |
| */ |
| static rd_kafka_op_res_t |
| rd_kafka_consume_cb (rd_kafka_t *rk, |
| rd_kafka_q_t *rkq, |
| rd_kafka_op_t *rko, |
| rd_kafka_q_cb_type_t cb_type, void *opaque) { |
| struct consume_ctx *ctx = opaque; |
| rd_kafka_message_t *rkmessage; |
| |
| if (unlikely(rd_kafka_op_version_outdated(rko, 0))) { |
| rd_kafka_op_destroy(rko); |
| return RD_KAFKA_OP_RES_HANDLED; |
| } |
| |
| rkmessage = rd_kafka_message_get(rko); |
| |
| rd_kafka_op_offset_store(rk, rko, rkmessage); |
| |
| ctx->consume_cb(rkmessage, ctx->opaque); |
| |
| rd_kafka_op_destroy(rko); |
| |
| return RD_KAFKA_OP_RES_HANDLED; |
| } |
| |
| |
| |
| static rd_kafka_op_res_t |
| rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt, |
| void (*consume_cb) (rd_kafka_message_t |
| *rkmessage, |
| void *opaque), |
| void *opaque) { |
| struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque }; |
| return rd_kafka_q_serve(rkq, timeout_ms, max_cnt, |
| RD_KAFKA_Q_CB_RETURN, |
| rd_kafka_consume_cb, &ctx); |
| |
| } |
| |
| |
| int rd_kafka_consume_callback (rd_kafka_topic_t *app_rkt, int32_t partition, |
| int timeout_ms, |
| void (*consume_cb) (rd_kafka_message_t |
| *rkmessage, |
| void *opaque), |
| void *opaque) { |
| rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_kafka_toppar_t *rktp; |
| int r; |
| |
| /* Get toppar */ |
| rd_kafka_topic_rdlock(rkt); |
| s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/); |
| if (unlikely(!s_rktp)) |
| s_rktp = rd_kafka_toppar_desired_get(rkt, partition); |
| rd_kafka_topic_rdunlock(rkt); |
| |
| if (unlikely(!s_rktp)) { |
| /* No such toppar known */ |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
| ESRCH); |
| return -1; |
| } |
| |
| rktp = rd_kafka_toppar_s2i(s_rktp); |
| r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms, |
| rkt->rkt_conf.consume_callback_max_msgs, |
| consume_cb, opaque); |
| |
| rd_kafka_toppar_destroy(s_rktp); |
| |
| rd_kafka_set_last_error(0, 0); |
| |
| return r; |
| } |
| |
| |
| |
| int rd_kafka_consume_callback_queue (rd_kafka_queue_t *rkqu, |
| int timeout_ms, |
| void (*consume_cb) (rd_kafka_message_t |
| *rkmessage, |
| void *opaque), |
| void *opaque) { |
| return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0, |
| consume_cb, opaque); |
| } |
| |
| |
| /** |
| * Serve queue 'rkq' and return one message. |
| * By serving the queue it will also call any registered callbacks |
| * registered for matching events, this includes consumer_cb() |
| * in which case no message will be returned. |
| */ |
| static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk, |
| rd_kafka_q_t *rkq, |
| int timeout_ms) { |
| rd_kafka_op_t *rko; |
| rd_kafka_message_t *rkmessage = NULL; |
| rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); |
| |
| rd_kafka_yield_thread = 0; |
| while ((rko = rd_kafka_q_pop(rkq, |
| rd_timeout_remains(abs_timeout), 0))) { |
| rd_kafka_op_res_t res; |
| |
| res = rd_kafka_poll_cb(rk, rkq, rko, |
| RD_KAFKA_Q_CB_RETURN, NULL); |
| |
| if (res == RD_KAFKA_OP_RES_PASS) |
| break; |
| |
| if (unlikely(res == RD_KAFKA_OP_RES_YIELD || |
| rd_kafka_yield_thread)) { |
| /* Callback called rd_kafka_yield(), we must |
| * stop dispatching the queue and return. */ |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR, |
| EINTR); |
| return NULL; |
| } |
| |
| /* Message was handled by callback. */ |
| continue; |
| } |
| |
| if (!rko) { |
| /* Timeout reached with no op returned. */ |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT, |
| ETIMEDOUT); |
| return NULL; |
| } |
| |
| rd_kafka_assert(rk, |
| rko->rko_type == RD_KAFKA_OP_FETCH || |
| rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR); |
| |
| /* Get rkmessage from rko */ |
| rkmessage = rd_kafka_message_get(rko); |
| |
| /* Store offset */ |
| rd_kafka_op_offset_store(rk, rko, rkmessage); |
| |
| rd_kafka_set_last_error(0, 0); |
| |
| return rkmessage; |
| } |
| |
| rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt, |
| int32_t partition, |
| int timeout_ms) { |
| rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_kafka_toppar_t *rktp; |
| rd_kafka_message_t *rkmessage; |
| |
| rd_kafka_topic_rdlock(rkt); |
| s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/); |
| if (unlikely(!s_rktp)) |
| s_rktp = rd_kafka_toppar_desired_get(rkt, partition); |
| rd_kafka_topic_rdunlock(rkt); |
| |
| if (unlikely(!s_rktp)) { |
| /* No such toppar known */ |
| rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
| ESRCH); |
| return NULL; |
| } |
| |
| rktp = rd_kafka_toppar_s2i(s_rktp); |
| rkmessage = rd_kafka_consume0(rkt->rkt_rk, |
| rktp->rktp_fetchq, timeout_ms); |
| |
| rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */ |
| |
| return rkmessage; |
| } |
| |
| |
| rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu, |
| int timeout_ms) { |
| return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms); |
| } |
| |
| |
| |
| |
| rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) { |
| rd_kafka_cgrp_t *rkcg; |
| |
| if (!(rkcg = rd_kafka_cgrp_get(rk))) |
| return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
| |
| rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q); |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| |
| |
| |
| rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, |
| int timeout_ms) { |
| rd_kafka_cgrp_t *rkcg; |
| |
| if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) { |
| rd_kafka_message_t *rkmessage = rd_kafka_message_new(); |
| rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
| return rkmessage; |
| } |
| |
| return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms); |
| } |
| |
| |
| rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk) { |
| rd_kafka_cgrp_t *rkcg; |
| rd_kafka_op_t *rko; |
| rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT; |
| rd_kafka_q_t *rkq; |
| |
| if (!(rkcg = rd_kafka_cgrp_get(rk))) |
| return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
| |
| /* Redirect cgrp queue to our temporary queue to make sure |
| * all posted ops (e.g., rebalance callbacks) are served by |
| * this function. */ |
| rkq = rd_kafka_q_new(rk); |
| rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq); |
| |
| rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */ |
| |
| while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) { |
| rd_kafka_op_res_t res; |
| if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) == |
| RD_KAFKA_OP_TERMINATE) { |
| err = rko->rko_err; |
| rd_kafka_op_destroy(rko); |
| break; |
| } |
| res = rd_kafka_poll_cb(rk, rkq, rko, |
| RD_KAFKA_Q_CB_RETURN, NULL); |
| if (res == RD_KAFKA_OP_RES_PASS) |
| rd_kafka_op_destroy(rko); |
| /* Ignore YIELD, we need to finish */ |
| } |
| |
| rd_kafka_q_destroy(rkq); |
| |
| rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL); |
| |
| return err; |
| } |
| |
| |
| |
| rd_kafka_resp_err_t |
| rd_kafka_committed (rd_kafka_t *rk, |
| rd_kafka_topic_partition_list_t *partitions, |
| int timeout_ms) { |
| rd_kafka_q_t *rkq; |
| rd_kafka_resp_err_t err; |
| rd_kafka_cgrp_t *rkcg; |
| rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); |
| |
| if (!partitions) |
| return RD_KAFKA_RESP_ERR__INVALID_ARG; |
| |
| if (!(rkcg = rd_kafka_cgrp_get(rk))) |
| return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
| |
| /* Set default offsets. */ |
| rd_kafka_topic_partition_list_reset_offsets(partitions, |
| RD_KAFKA_OFFSET_INVALID); |
| |
| rkq = rd_kafka_q_new(rk); |
| |
| do { |
| rd_kafka_op_t *rko; |
| int state_version = rd_kafka_brokers_get_state_version(rk); |
| |
| rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH); |
| rd_kafka_op_set_replyq(rko, rkq, NULL); |
| |
| /* Issue #827 |
| * Copy partition list to avoid use-after-free if we time out |
| * here, the app frees the list, and then cgrp starts |
| * processing the op. */ |
| rko->rko_u.offset_fetch.partitions = |
| rd_kafka_topic_partition_list_copy(partitions); |
| rko->rko_u.offset_fetch.do_free = 1; |
| |
| if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) { |
| err = RD_KAFKA_RESP_ERR__DESTROY; |
| break; |
| } |
| |
| rko = rd_kafka_q_pop(rkq, rd_timeout_remains(abs_timeout), 0); |
| if (rko) { |
| if (!(err = rko->rko_err)) |
| rd_kafka_topic_partition_list_update( |
| partitions, |
| rko->rko_u.offset_fetch.partitions); |
| else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD || |
| err == RD_KAFKA_RESP_ERR__TRANSPORT) && |
| !rd_kafka_brokers_wait_state_change( |
| rk, state_version, |
| rd_timeout_remains(abs_timeout))) |
| err = RD_KAFKA_RESP_ERR__TIMED_OUT; |
| |
| rd_kafka_op_destroy(rko); |
| } else |
| err = RD_KAFKA_RESP_ERR__TIMED_OUT; |
| } while (err == RD_KAFKA_RESP_ERR__TRANSPORT || |
| err == RD_KAFKA_RESP_ERR__WAIT_COORD); |
| |
| rd_kafka_q_destroy(rkq); |
| |
| return err; |
| } |
| |
| |
| |
| rd_kafka_resp_err_t |
| rd_kafka_position (rd_kafka_t *rk, |
| rd_kafka_topic_partition_list_t *partitions) { |
| int i; |
| |
| /* Set default offsets. */ |
| rd_kafka_topic_partition_list_reset_offsets(partitions, |
| RD_KAFKA_OFFSET_INVALID); |
| |
| for (i = 0 ; i < partitions->cnt ; i++) { |
| rd_kafka_topic_partition_t *rktpar = &partitions->elems[i]; |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_kafka_toppar_t *rktp; |
| |
| if (!(s_rktp = rd_kafka_toppar_get2(rk, rktpar->topic, |
| rktpar->partition, 0, 1))) { |
| rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
| rktpar->offset = RD_KAFKA_OFFSET_INVALID; |
| continue; |
| } |
| |
| rktp = rd_kafka_toppar_s2i(s_rktp); |
| rd_kafka_toppar_lock(rktp); |
| rktpar->offset = rktp->rktp_app_offset; |
| rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR; |
| rd_kafka_toppar_unlock(rktp); |
| rd_kafka_toppar_destroy(s_rktp); |
| } |
| |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| |
| |
| struct _query_wmark_offsets_state { |
| rd_kafka_resp_err_t err; |
| const char *topic; |
| int32_t partition; |
| int64_t offsets[2]; |
| int offidx; /* next offset to set from response */ |
| rd_ts_t ts_end; |
| int state_version; /* Broker state version */ |
| }; |
| |
| static void rd_kafka_query_wmark_offsets_resp_cb (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| struct _query_wmark_offsets_state *state = opaque; |
| rd_kafka_topic_partition_list_t *offsets; |
| rd_kafka_topic_partition_t *rktpar; |
| |
| offsets = rd_kafka_topic_partition_list_new(1); |
| err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request, offsets); |
| if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) { |
| rd_kafka_topic_partition_list_destroy(offsets); |
| return; /* Retrying */ |
| } |
| |
| /* Retry if no broker connection is available yet. */ |
| if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD || |
| err == RD_KAFKA_RESP_ERR__TRANSPORT) && |
| rkb && |
| rd_kafka_brokers_wait_state_change( |
| rkb->rkb_rk, state->state_version, |
| rd_timeout_remains(state->ts_end))) { |
| /* Retry */ |
| state->state_version = rd_kafka_brokers_get_state_version(rk); |
| request->rkbuf_retries = 0; |
| if (rd_kafka_buf_retry(rkb, request)) { |
| rd_kafka_topic_partition_list_destroy(offsets); |
| return; /* Retry in progress */ |
| } |
| /* FALLTHRU */ |
| } |
| |
| /* Partition not seen in response. */ |
| if (!(rktpar = rd_kafka_topic_partition_list_find(offsets, |
| state->topic, |
| state->partition))) |
| err = RD_KAFKA_RESP_ERR__BAD_MSG; |
| else if (rktpar->err) |
| err = rktpar->err; |
| else |
| state->offsets[state->offidx] = rktpar->offset; |
| |
| state->offidx++; |
| |
| if (err || state->offidx == 2) /* Error or Done */ |
| state->err = err; |
| |
| rd_kafka_topic_partition_list_destroy(offsets); |
| } |
| |
| |
| rd_kafka_resp_err_t |
| rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic, |
| int32_t partition, |
| int64_t *low, int64_t *high, int timeout_ms) { |
| rd_kafka_q_t *rkq; |
| struct _query_wmark_offsets_state state; |
| rd_ts_t ts_end = rd_timeout_init(timeout_ms); |
| rd_kafka_topic_partition_list_t *partitions; |
| rd_kafka_topic_partition_t *rktpar; |
| struct rd_kafka_partition_leader *leader; |
| rd_list_t leaders; |
| rd_kafka_resp_err_t err; |
| |
| partitions = rd_kafka_topic_partition_list_new(1); |
| rktpar = rd_kafka_topic_partition_list_add(partitions, |
| topic, partition); |
| |
| rd_list_init(&leaders, partitions->cnt, |
| (void *)rd_kafka_partition_leader_destroy); |
| |
| err = rd_kafka_topic_partition_list_query_leaders(rk, partitions, |
| &leaders, timeout_ms); |
| if (err) { |
| rd_list_destroy(&leaders); |
| rd_kafka_topic_partition_list_destroy(partitions); |
| return err; |
| } |
| |
| leader = rd_list_elem(&leaders, 0); |
| |
| rkq = rd_kafka_q_new(rk); |
| |
| /* Due to KAFKA-1588 we need to send a request for each wanted offset, |
| * in this case one for the low watermark and one for the high. */ |
| state.topic = topic; |
| state.partition = partition; |
| state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING; |
| state.offsets[1] = RD_KAFKA_OFFSET_END; |
| state.offidx = 0; |
| state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS; |
| state.ts_end = ts_end; |
| state.state_version = rd_kafka_brokers_get_state_version(rk); |
| |
| |
| rktpar->offset = RD_KAFKA_OFFSET_BEGINNING; |
| rd_kafka_OffsetRequest(leader->rkb, partitions, 0, |
| RD_KAFKA_REPLYQ(rkq, 0), |
| rd_kafka_query_wmark_offsets_resp_cb, |
| &state); |
| |
| rktpar->offset = RD_KAFKA_OFFSET_END; |
| rd_kafka_OffsetRequest(leader->rkb, partitions, 0, |
| RD_KAFKA_REPLYQ(rkq, 0), |
| rd_kafka_query_wmark_offsets_resp_cb, |
| &state); |
| |
| rd_kafka_topic_partition_list_destroy(partitions); |
| rd_list_destroy(&leaders); |
| |
| /* Wait for reply (or timeout) */ |
| while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS && |
| rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK, |
| rd_kafka_poll_cb, NULL) != |
| RD_KAFKA_OP_RES_YIELD) |
| ; |
| |
| rd_kafka_q_destroy(rkq); |
| |
| if (state.err) |
| return state.err; |
| else if (state.offidx != 2) |
| return RD_KAFKA_RESP_ERR__FAIL; |
| |
| /* We are not certain about the returned order. */ |
| if (state.offsets[0] < state.offsets[1]) { |
| *low = state.offsets[0]; |
| *high = state.offsets[1]; |
| } else { |
| *low = state.offsets[1]; |
| *high = state.offsets[0]; |
| } |
| |
| /* If partition is empty only one offset (the last) will be returned. */ |
| if (*low < 0 && *high >= 0) |
| *low = *high; |
| |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| |
| rd_kafka_resp_err_t |
| rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic, |
| int32_t partition, |
| int64_t *low, int64_t *high) { |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_kafka_toppar_t *rktp; |
| |
| s_rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1); |
| if (!s_rktp) |
| return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
| rktp = rd_kafka_toppar_s2i(s_rktp); |
| |
| rd_kafka_toppar_lock(rktp); |
| *low = rktp->rktp_lo_offset; |
| *high = rktp->rktp_hi_offset; |
| rd_kafka_toppar_unlock(rktp); |
| |
| rd_kafka_toppar_destroy(s_rktp); |
| |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| |
| /** |
| * @brief get_offsets_for_times() state |
| */ |
| struct _get_offsets_for_times { |
| rd_kafka_topic_partition_list_t *results; |
| rd_kafka_resp_err_t err; |
| int wait_reply; |
| int state_version; |
| rd_ts_t ts_end; |
| }; |
| |
| /** |
| * @brief Handle OffsetRequest responses |
| */ |
| static void rd_kafka_get_offsets_for_times_resp_cb (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| struct _get_offsets_for_times *state = opaque; |
| |
| err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request, |
| state->results); |
| if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) |
| return; /* Retrying */ |
| |
| /* Retry if no broker connection is available yet. */ |
| if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD || |
| err == RD_KAFKA_RESP_ERR__TRANSPORT) && |
| rkb && |
| rd_kafka_brokers_wait_state_change( |
| rkb->rkb_rk, state->state_version, |
| rd_timeout_remains(state->ts_end))) { |
| /* Retry */ |
| state->state_version = rd_kafka_brokers_get_state_version(rk); |
| request->rkbuf_retries = 0; |
| if (rd_kafka_buf_retry(rkb, request)) |
| return; /* Retry in progress */ |
| /* FALLTHRU */ |
| } |
| |
| if (err && !state->err) |
| state->err = err; |
| |
| state->wait_reply--; |
| } |
| |
| |
| rd_kafka_resp_err_t |
| rd_kafka_offsets_for_times (rd_kafka_t *rk, |
| rd_kafka_topic_partition_list_t *offsets, |
| int timeout_ms) { |
| rd_kafka_q_t *rkq; |
| struct _get_offsets_for_times state = RD_ZERO_INIT; |
| rd_ts_t ts_end = rd_timeout_init(timeout_ms); |
| rd_list_t leaders; |
| int i; |
| rd_kafka_resp_err_t err; |
| struct rd_kafka_partition_leader *leader; |
| |
| if (offsets->cnt == 0) |
| return RD_KAFKA_RESP_ERR__INVALID_ARG; |
| |
| rd_list_init(&leaders, offsets->cnt, |
| (void *)rd_kafka_partition_leader_destroy); |
| |
| err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders, |
| timeout_ms); |
| if (err) { |
| rd_list_destroy(&leaders); |
| return err; |
| } |
| |
| |
| rkq = rd_kafka_q_new(rk); |
| |
| state.wait_reply = 0; |
| state.results = rd_kafka_topic_partition_list_new(offsets->cnt); |
| |
| /* For each leader send a request for its partitions */ |
| RD_LIST_FOREACH(leader, &leaders, i) { |
| state.wait_reply++; |
| rd_kafka_OffsetRequest(leader->rkb, leader->partitions, 1, |
| RD_KAFKA_REPLYQ(rkq, 0), |
| rd_kafka_get_offsets_for_times_resp_cb, |
| &state); |
| } |
| |
| rd_list_destroy(&leaders); |
| |
| /* Wait for reply (or timeout) */ |
| while (state.wait_reply > 0 && rd_timeout_remains(ts_end) > 0) |
| rd_kafka_q_serve(rkq, rd_timeout_remains(ts_end), |
| 0, RD_KAFKA_Q_CB_CALLBACK, |
| rd_kafka_poll_cb, NULL); |
| |
| rd_kafka_q_destroy(rkq); |
| |
| /* Then update the queried partitions. */ |
| if (!state.err) |
| rd_kafka_topic_partition_list_update(offsets, state.results); |
| |
| rd_kafka_topic_partition_list_destroy(state.results); |
| |
| return state.err; |
| } |
| |
| |
| /** |
| * rd_kafka_poll() (and similar) op callback handler. |
| * Will either call registered callback depending on cb_type and op type |
| * or return op to application, if applicable (e.g., fetch message). |
| * |
| * Returns 1 if op was handled, else 0. |
| * |
| * Locality: application thread |
| */ |
| rd_kafka_op_res_t |
| rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, |
| rd_kafka_q_cb_type_t cb_type, void *opaque) { |
| rd_kafka_msg_t *rkm; |
| |
| /* Return-as-event requested, see if op can be converted to event, |
| * otherwise fall through and trigger callbacks. */ |
| if (cb_type == RD_KAFKA_Q_CB_EVENT && rd_kafka_event_setup(rk, rko)) |
| return 0; /* Return as event */ |
| |
| switch ((int)rko->rko_type) |
| { |
| case RD_KAFKA_OP_FETCH: |
| if (!rk->rk_conf.consume_cb || |
| cb_type == RD_KAFKA_Q_CB_RETURN || |
| cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) |
| return RD_KAFKA_OP_RES_PASS; /* Dont handle here */ |
| else { |
| struct consume_ctx ctx = { |
| .consume_cb = rk->rk_conf.consume_cb, |
| .opaque = rk->rk_conf.opaque }; |
| |
| return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx); |
| } |
| break; |
| |
| case RD_KAFKA_OP_REBALANCE: |
| /* If EVENT_REBALANCE is enabled but rebalance_cb isnt |
| * we need to perform a dummy assign for the application. |
| * This might happen during termination with consumer_close() */ |
| if (rk->rk_conf.rebalance_cb) |
| rk->rk_conf.rebalance_cb( |
| rk, rko->rko_err, |
| rko->rko_u.rebalance.partitions, |
| rk->rk_conf.opaque); |
| else { |
| rd_kafka_dbg(rk, CGRP, "UNASSIGN", |
| "Forcing unassign of %d partition(s)", |
| rko->rko_u.rebalance.partitions ? |
| rko->rko_u.rebalance.partitions->cnt : 0); |
| rd_kafka_assign(rk, NULL); |
| } |
| break; |
| |
| case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY: |
| if (!rko->rko_u.offset_commit.cb) |
| return RD_KAFKA_OP_RES_PASS; /* Dont handle here */ |
| rko->rko_u.offset_commit.cb( |
| rk, rko->rko_err, |
| rko->rko_u.offset_commit.partitions, |
| rko->rko_u.offset_commit.opaque); |
| break; |
| |
| case RD_KAFKA_OP_CONSUMER_ERR: |
| /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER): |
| * Consumer errors are returned to the application |
| * as rkmessages, not error callbacks. |
| * |
| * rd_kafka_poll() (_Q_CB_GLOBAL): |
| * convert to ERR op (fallthru) |
| */ |
| if (cb_type == RD_KAFKA_Q_CB_RETURN || |
| cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) { |
| /* return as message_t to application */ |
| return RD_KAFKA_OP_RES_PASS; |
| } |
| /* FALLTHRU */ |
| |
| case RD_KAFKA_OP_ERR: |
| if (rk->rk_conf.error_cb) |
| rk->rk_conf.error_cb(rk, rko->rko_err, |
| rko->rko_u.err.errstr, |
| rk->rk_conf.opaque); |
| else |
| rd_kafka_log(rk, LOG_ERR, "ERROR", |
| "%s: %s: %s", |
| rk->rk_name, |
| rd_kafka_err2str(rko->rko_err), |
| rko->rko_u.err.errstr); |
| break; |
| |
| case RD_KAFKA_OP_DR: |
| /* Delivery report: |
| * call application DR callback for each message. */ |
| while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) { |
| rd_kafka_message_t *rkmessage; |
| |
| TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs, |
| rkm, rkm_link); |
| |
| rkmessage = rd_kafka_message_get_from_rkm(rko, rkm); |
| |
| if (rk->rk_conf.dr_msg_cb) { |
| rk->rk_conf.dr_msg_cb(rk, rkmessage, |
| rk->rk_conf.opaque); |
| |
| } else { |
| |
| rk->rk_conf.dr_cb(rk, |
| rkmessage->payload, |
| rkmessage->len, |
| rkmessage->err, |
| rk->rk_conf.opaque, |
| rkmessage->_private); |
| } |
| |
| rd_kafka_msg_destroy(rk, rkm); |
| |
| if (unlikely(rd_kafka_yield_thread)) { |
| /* Callback called yield(), |
| * re-enqueue the op (if there are any |
| * remaining messages). */ |
| if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq. |
| rkmq_msgs)) |
| rd_kafka_q_reenq(rkq, rko); |
| else |
| rd_kafka_op_destroy(rko); |
| return RD_KAFKA_OP_RES_YIELD; |
| } |
| } |
| |
| rd_kafka_msgq_init(&rko->rko_u.dr.msgq); |
| |
| break; |
| |
| case RD_KAFKA_OP_THROTTLE: |
| if (rk->rk_conf.throttle_cb) |
| rk->rk_conf.throttle_cb(rk, rko->rko_u.throttle.nodename, |
| rko->rko_u.throttle.nodeid, |
| rko->rko_u.throttle. |
| throttle_time, |
| rk->rk_conf.opaque); |
| break; |
| |
| case RD_KAFKA_OP_STATS: |
| /* Statistics */ |
| if (rk->rk_conf.stats_cb && |
| rk->rk_conf.stats_cb(rk, rko->rko_u.stats.json, |
| rko->rko_u.stats.json_len, |
| rk->rk_conf.opaque) == 1) |
| rko->rko_u.stats.json = NULL; /* Application wanted json ptr */ |
| break; |
| |
| case RD_KAFKA_OP_LOG: |
| if (likely(rk->rk_conf.log_cb && |
| rk->rk_conf.log_level >= rko->rko_u.log.level)) |
| rk->rk_conf.log_cb(rk, |
| rko->rko_u.log.level, |
| rko->rko_u.log.fac, |
| rko->rko_u.log.str); |
| break; |
| |
| case RD_KAFKA_OP_TERMINATE: |
| /* nop: just a wake-up */ |
| break; |
| |
| default: |
| rd_kafka_assert(rk, !*"cant handle op type"); |
| break; |
| } |
| |
| rd_kafka_op_destroy(rko); |
| |
| return 1; /* op was handled */ |
| } |
| |
| int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) { |
| return rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, |
| RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); |
| } |
| |
| |
| rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) { |
| rd_kafka_op_t *rko; |
| rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, timeout_ms, 0, |
| RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL); |
| if (!rko) |
| return NULL; |
| |
| return rko; |
| } |
| |
| int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms) { |
| return rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0, |
| RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); |
| } |
| |
| |
| |
| static void rd_kafka_toppar_dump (FILE *fp, const char *indent, |
| rd_kafka_toppar_t *rktp) { |
| |
| fprintf(fp, "%s%.*s [%"PRId32"] leader %s\n", |
| indent, |
| RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
| rktp->rktp_partition, |
| rktp->rktp_leader ? |
| rktp->rktp_leader->rkb_name : "none"); |
| fprintf(fp, |
| "%s refcnt %i\n" |
| "%s msgq: %i messages\n" |
| "%s xmit_msgq: %i messages\n" |
| "%s total: %"PRIu64" messages, %"PRIu64" bytes\n", |
| indent, rd_refcnt_get(&rktp->rktp_refcnt), |
| indent, rd_atomic32_get(&rktp->rktp_msgq.rkmq_msg_cnt), |
| indent, rd_atomic32_get(&rktp->rktp_xmit_msgq.rkmq_msg_cnt), |
| indent, rd_atomic64_get(&rktp->rktp_c.tx_msgs), rd_atomic64_get(&rktp->rktp_c.tx_bytes)); |
| } |
| |
| static void rd_kafka_broker_dump (FILE *fp, rd_kafka_broker_t *rkb, int locks) { |
| rd_kafka_toppar_t *rktp; |
| |
| if (locks) |
| rd_kafka_broker_lock(rkb); |
| fprintf(fp, " rd_kafka_broker_t %p: %s NodeId %"PRId32 |
| " in state %s (for %.3fs)\n", |
| rkb, rkb->rkb_name, rkb->rkb_nodeid, |
| rd_kafka_broker_state_names[rkb->rkb_state], |
| rkb->rkb_ts_state ? |
| (float)(rd_clock() - rkb->rkb_ts_state) / 1000000.0f : |
| 0.0f); |
| fprintf(fp, " refcnt %i\n", rd_refcnt_get(&rkb->rkb_refcnt)); |
| fprintf(fp, " outbuf_cnt: %i waitresp_cnt: %i\n", |
| rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt), |
| rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt)); |
| fprintf(fp, |
| " %"PRIu64 " messages sent, %"PRIu64" bytes, " |
| "%"PRIu64" errors, %"PRIu64" timeouts\n" |
| " %"PRIu64 " messages received, %"PRIu64" bytes, " |
| "%"PRIu64" errors\n" |
| " %"PRIu64 " messageset transmissions were retried\n", |
| rd_atomic64_get(&rkb->rkb_c.tx), rd_atomic64_get(&rkb->rkb_c.tx_bytes), |
| rd_atomic64_get(&rkb->rkb_c.tx_err), rd_atomic64_get(&rkb->rkb_c.req_timeouts), |
| rd_atomic64_get(&rkb->rkb_c.rx), rd_atomic64_get(&rkb->rkb_c.rx_bytes), |
| rd_atomic64_get(&rkb->rkb_c.rx_err), |
| rd_atomic64_get(&rkb->rkb_c.tx_retries)); |
| |
| fprintf(fp, " %i toppars:\n", rkb->rkb_toppar_cnt); |
| TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) |
| rd_kafka_toppar_dump(fp, " ", rktp); |
| if (locks) { |
| rd_kafka_broker_unlock(rkb); |
| } |
| } |
| |
| |
| static void rd_kafka_dump0 (FILE *fp, rd_kafka_t *rk, int locks) { |
| rd_kafka_broker_t *rkb; |
| rd_kafka_itopic_t *rkt; |
| rd_kafka_toppar_t *rktp; |
| shptr_rd_kafka_toppar_t *s_rktp; |
| int i; |
| unsigned int tot_cnt; |
| size_t tot_size; |
| |
| rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size); |
| |
| if (locks) |
| rd_kafka_rdlock(rk); |
| #if ENABLE_DEVEL |
| fprintf(fp, "rd_kafka_op_cnt: %d\n", rd_atomic32_get(&rd_kafka_op_cnt)); |
| #endif |
| fprintf(fp, "rd_kafka_t %p: %s\n", rk, rk->rk_name); |
| |
| fprintf(fp, " producer.msg_cnt %u (%"PRIusz" bytes)\n", |
| tot_cnt, tot_size); |
| fprintf(fp, " rk_rep reply queue: %i ops\n", |
| rd_kafka_q_len(rk->rk_rep)); |
| |
| fprintf(fp, " brokers:\n"); |
| if (locks) |
| mtx_lock(&rk->rk_internal_rkb_lock); |
| if (rk->rk_internal_rkb) |
| rd_kafka_broker_dump(fp, rk->rk_internal_rkb, locks); |
| if (locks) |
| mtx_unlock(&rk->rk_internal_rkb_lock); |
| |
| TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
| rd_kafka_broker_dump(fp, rkb, locks); |
| } |
| |
| fprintf(fp, " cgrp:\n"); |
| if (rk->rk_cgrp) { |
| rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; |
| fprintf(fp, " %.*s in state %s, flags 0x%x\n", |
| RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
| rd_kafka_cgrp_state_names[rkcg->rkcg_state], |
| rkcg->rkcg_flags); |
| fprintf(fp, " coord_id %"PRId32", managing broker %s\n", |
| rkcg->rkcg_coord_id, |
| rkcg->rkcg_rkb ? |
| rd_kafka_broker_name(rkcg->rkcg_rkb) : "(none)"); |
| |
| fprintf(fp, " toppars:\n"); |
| RD_LIST_FOREACH(s_rktp, &rkcg->rkcg_toppars, i) { |
| rktp = rd_kafka_toppar_s2i(s_rktp); |
| fprintf(fp, " %.*s [%"PRId32"] in state %s\n", |
| RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
| rktp->rktp_partition, |
| rd_kafka_fetch_states[rktp->rktp_fetch_state]); |
| } |
| } |
| |
| fprintf(fp, " topics:\n"); |
| TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
| fprintf(fp, " %.*s with %"PRId32" partitions, state %s, " |
| "refcnt %i\n", |
| RD_KAFKAP_STR_PR(rkt->rkt_topic), |
| rkt->rkt_partition_cnt, |
| rd_kafka_topic_state_names[rkt->rkt_state], |
| rd_refcnt_get(&rkt->rkt_refcnt)); |
| if (rkt->rkt_ua) |
| rd_kafka_toppar_dump(fp, " ", |
| rd_kafka_toppar_s2i(rkt->rkt_ua)); |
| if (rd_list_empty(&rkt->rkt_desp)) { |
| fprintf(fp, " desired partitions:"); |
| RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) |
| fprintf(fp, " %"PRId32, |
| rd_kafka_toppar_s2i(s_rktp)-> |
| rktp_partition); |
| fprintf(fp, "\n"); |
| } |
| } |
| |
| fprintf(fp, "\n"); |
| rd_kafka_metadata_cache_dump(fp, rk); |
| |
| if (locks) |
| rd_kafka_rdunlock(rk); |
| } |
| |
| void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) { |
| |
| if (rk) |
| rd_kafka_dump0(fp, rk, 1/*locks*/); |
| |
| #if ENABLE_SHAREDPTR_DEBUG |
| rd_shared_ptrs_dump(); |
| #endif |
| } |
| |
| |
| |
| const char *rd_kafka_name (const rd_kafka_t *rk) { |
| return rk->rk_name; |
| } |
| |
| rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk) { |
| return rk->rk_type; |
| } |
| |
| |
| char *rd_kafka_memberid (const rd_kafka_t *rk) { |
| rd_kafka_op_t *rko; |
| rd_kafka_cgrp_t *rkcg; |
| char *memberid; |
| |
| if (!(rkcg = rd_kafka_cgrp_get(rk))) |
| return NULL; |
| |
| rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_NAME); |
| if (!rko) |
| return NULL; |
| memberid = rko->rko_u.name.str; |
| rko->rko_u.name.str = NULL; |
| rd_kafka_op_destroy(rko); |
| |
| return memberid; |
| } |
| |
| |
| char *rd_kafka_clusterid (rd_kafka_t *rk, int timeout_ms) { |
| rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); |
| |
| /* ClusterId is returned in Metadata >=V2 responses and |
| * cached on the rk. If no cached value is available |
| * it means no metadata has been received yet, or we're |
| * using a lower protocol version |
| * (e.g., lack of api.version.request=true). */ |
| |
| while (1) { |
| int remains_ms; |
| |
| rd_kafka_rdlock(rk); |
| |
| if (rk->rk_clusterid) { |
| /* Cached clusterid available. */ |
| char *ret = rd_strdup(rk->rk_clusterid); |
| rd_kafka_rdunlock(rk); |
| return ret; |
| } else if (rk->rk_ts_metadata > 0) { |
| /* Metadata received but no clusterid, |
| * this probably means the broker is too old |
| * or api.version.request=false. */ |
| rd_kafka_rdunlock(rk); |
| return NULL; |
| } |
| |
| rd_kafka_rdunlock(rk); |
| |
| /* Wait for up to timeout_ms for a metadata refresh, |
| * if permitted by application. */ |
| remains_ms = rd_timeout_remains(abs_timeout); |
| if (remains_ms <= 0) |
| return NULL; |
| |
| rd_kafka_metadata_cache_wait_change( |
| rk, rd_timeout_remains(abs_timeout)); |
| } |
| |
| return NULL; |
| } |
| |
| |
| void *rd_kafka_opaque (const rd_kafka_t *rk) { |
| return rk->rk_conf.opaque; |
| } |
| |
| |
| int rd_kafka_outq_len (rd_kafka_t *rk) { |
| return rd_kafka_curr_msgs_cnt(rk) + rd_kafka_q_len(rk->rk_rep); |
| } |
| |
| |
| rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms) { |
| unsigned int msg_cnt = 0; |
| int qlen; |
| rd_ts_t ts_end = rd_timeout_init(timeout_ms); |
| int tmout; |
| |
| if (rk->rk_type != RD_KAFKA_PRODUCER) |
| return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; |
| |
| rd_kafka_yield_thread = 0; |
| while (((qlen = rd_kafka_q_len(rk->rk_rep)) > 0 || |
| (msg_cnt = rd_kafka_curr_msgs_cnt(rk)) > 0) && |
| !rd_kafka_yield_thread && |
| (tmout = rd_timeout_remains_limit(ts_end, 100))!=RD_POLL_NOWAIT) |
| rd_kafka_poll(rk, tmout); |
| |
| return qlen + msg_cnt > 0 ? RD_KAFKA_RESP_ERR__TIMED_OUT : |
| RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| |
| |
| int rd_kafka_version (void) { |
| return RD_KAFKA_VERSION; |
| } |
| |
| const char *rd_kafka_version_str (void) { |
| static char ret[128]; |
| size_t of = 0, r; |
| |
| if (*ret) |
| return ret; |
| |
| #ifdef LIBRDKAFKA_GIT_VERSION |
| if (*LIBRDKAFKA_GIT_VERSION) { |
| of = rd_snprintf(ret, sizeof(ret), "%s", |
| *LIBRDKAFKA_GIT_VERSION == 'v' ? |
| LIBRDKAFKA_GIT_VERSION+1 : |
| LIBRDKAFKA_GIT_VERSION); |
| if (of > sizeof(ret)) |
| of = sizeof(ret); |
| } |
| #endif |
| |
| #define _my_sprintf(...) do { \ |
| r = rd_snprintf(ret+of, sizeof(ret)-of, __VA_ARGS__); \ |
| if (r > sizeof(ret)-of) \ |
| r = sizeof(ret)-of; \ |
| of += r; \ |
| } while(0) |
| |
| if (of == 0) { |
| int ver = rd_kafka_version(); |
| int prel = (ver & 0xff); |
| _my_sprintf("%i.%i.%i", |
| (ver >> 24) & 0xff, |
| (ver >> 16) & 0xff, |
| (ver >> 8) & 0xff); |
| if (prel != 0xff) { |
| /* pre-builds below 200 are just running numbers, |
| * above 200 are RC numbers. */ |
| if (prel <= 200) |
| _my_sprintf("-pre%d", prel); |
| else |
| _my_sprintf("-RC%d", prel - 200); |
| } |
| } |
| |
| #if ENABLE_DEVEL |
| _my_sprintf("-devel"); |
| #endif |
| |
| #if ENABLE_SHAREDPTR_DEBUG |
| _my_sprintf("-shptr"); |
| #endif |
| |
| #if WITHOUT_OPTIMIZATION |
| _my_sprintf("-O0"); |
| #endif |
| |
| return ret; |
| } |
| |
| |
| /** |
| * Assert trampoline to print some debugging information on crash. |
| */ |
| void |
| RD_NORETURN |
| rd_kafka_crash (const char *file, int line, const char *function, |
| rd_kafka_t *rk, const char *reason) { |
| fprintf(stderr, "*** %s:%i:%s: %s ***\n", |
| file, line, function, reason); |
| if (rk) |
| rd_kafka_dump0(stderr, rk, 0/*no locks*/); |
| abort(); |
| } |
| |
| |
| |
| |
| |
| struct list_groups_state { |
| rd_kafka_q_t *q; |
| rd_kafka_resp_err_t err; |
| int wait_cnt; |
| const char *desired_group; |
| struct rd_kafka_group_list *grplist; |
| int grplist_size; |
| }; |
| |
| static void rd_kafka_DescribeGroups_resp_cb (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *reply, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| struct list_groups_state *state = opaque; |
| const int log_decode_errors = LOG_ERR; |
| int cnt; |
| |
| state->wait_cnt--; |
| |
| if (err) |
| goto err; |
| |
| rd_kafka_buf_read_i32(reply, &cnt); |
| |
| while (cnt-- > 0) { |
| int16_t ErrorCode; |
| rd_kafkap_str_t Group, GroupState, ProtoType, Proto; |
| int MemberCnt; |
| struct rd_kafka_group_info *gi; |
| |
| if (state->grplist->group_cnt == state->grplist_size) { |
| /* Grow group array */ |
| state->grplist_size *= 2; |
| state->grplist->groups = |
| rd_realloc(state->grplist->groups, |
| state->grplist_size * |
| sizeof(*state->grplist->groups)); |
| } |
| |
| gi = &state->grplist->groups[state->grplist->group_cnt++]; |
| memset(gi, 0, sizeof(*gi)); |
| |
| rd_kafka_buf_read_i16(reply, &ErrorCode); |
| rd_kafka_buf_read_str(reply, &Group); |
| rd_kafka_buf_read_str(reply, &GroupState); |
| rd_kafka_buf_read_str(reply, &ProtoType); |
| rd_kafka_buf_read_str(reply, &Proto); |
| rd_kafka_buf_read_i32(reply, &MemberCnt); |
| |
| if (MemberCnt > 100000) { |
| err = RD_KAFKA_RESP_ERR__BAD_MSG; |
| goto err; |
| } |
| |
| rd_kafka_broker_lock(rkb); |
| gi->broker.id = rkb->rkb_nodeid; |
| gi->broker.host = rd_strdup(rkb->rkb_origname); |
| gi->broker.port = rkb->rkb_port; |
| rd_kafka_broker_unlock(rkb); |
| |
| gi->err = ErrorCode; |
| gi->group = RD_KAFKAP_STR_DUP(&Group); |
| gi->state = RD_KAFKAP_STR_DUP(&GroupState); |
| gi->protocol_type = RD_KAFKAP_STR_DUP(&ProtoType); |
| gi->protocol = RD_KAFKAP_STR_DUP(&Proto); |
| |
| if (MemberCnt > 0) |
| gi->members = |
| rd_malloc(MemberCnt * sizeof(*gi->members)); |
| |
| while (MemberCnt-- > 0) { |
| rd_kafkap_str_t MemberId, ClientId, ClientHost; |
| rd_kafkap_bytes_t Meta, Assignment; |
| struct rd_kafka_group_member_info *mi; |
| |
| mi = &gi->members[gi->member_cnt++]; |
| memset(mi, 0, sizeof(*mi)); |
| |
| rd_kafka_buf_read_str(reply, &MemberId); |
| rd_kafka_buf_read_str(reply, &ClientId); |
| rd_kafka_buf_read_str(reply, &ClientHost); |
| rd_kafka_buf_read_bytes(reply, &Meta); |
| rd_kafka_buf_read_bytes(reply, &Assignment); |
| |
| mi->member_id = RD_KAFKAP_STR_DUP(&MemberId); |
| mi->client_id = RD_KAFKAP_STR_DUP(&ClientId); |
| mi->client_host = RD_KAFKAP_STR_DUP(&ClientHost); |
| |
| if (RD_KAFKAP_BYTES_LEN(&Meta) == 0) { |
| mi->member_metadata_size = 0; |
| mi->member_metadata = NULL; |
| } else { |
| mi->member_metadata_size = |
| RD_KAFKAP_BYTES_LEN(&Meta); |
| mi->member_metadata = |
| rd_memdup(Meta.data, |
| mi->member_metadata_size); |
| } |
| |
| if (RD_KAFKAP_BYTES_LEN(&Assignment) == 0) { |
| mi->member_assignment_size = 0; |
| mi->member_assignment = NULL; |
| } else { |
| mi->member_assignment_size = |
| RD_KAFKAP_BYTES_LEN(&Assignment); |
| mi->member_assignment = |
| rd_memdup(Assignment.data, |
| mi->member_assignment_size); |
| } |
| } |
| } |
| |
| err: |
| state->err = err; |
| return; |
| |
| err_parse: |
| state->err = reply->rkbuf_err; |
| } |
| |
| static void rd_kafka_ListGroups_resp_cb (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *reply, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| struct list_groups_state *state = opaque; |
| const int log_decode_errors = LOG_ERR; |
| int16_t ErrorCode; |
| char **grps; |
| int cnt, grpcnt, i = 0; |
| |
| state->wait_cnt--; |
| |
| if (err) |
| goto err; |
| |
| rd_kafka_buf_read_i16(reply, &ErrorCode); |
| if (ErrorCode) { |
| err = ErrorCode; |
| goto err; |
| } |
| |
| rd_kafka_buf_read_i32(reply, &cnt); |
| |
| if (state->desired_group) |
| grpcnt = 1; |
| else |
| grpcnt = cnt; |
| |
| if (cnt == 0 || grpcnt == 0) |
| return; |
| |
| grps = rd_malloc(sizeof(*grps) * grpcnt); |
| |
| while (cnt-- > 0) { |
| rd_kafkap_str_t grp, proto; |
| |
| rd_kafka_buf_read_str(reply, &grp); |
| rd_kafka_buf_read_str(reply, &proto); |
| |
| if (state->desired_group && |
| rd_kafkap_str_cmp_str(&grp, state->desired_group)) |
| continue; |
| |
| grps[i++] = RD_KAFKAP_STR_DUP(&grp); |
| |
| if (i == grpcnt) |
| break; |
| } |
| |
| if (i > 0) { |
| state->wait_cnt++; |
| rd_kafka_DescribeGroupsRequest(rkb, |
| (const char **)grps, i, |
| RD_KAFKA_REPLYQ(state->q, 0), |
| rd_kafka_DescribeGroups_resp_cb, |
| state); |
| |
| while (i-- > 0) |
| rd_free(grps[i]); |
| } |
| |
| |
| rd_free(grps); |
| |
| err: |
| state->err = err; |
| return; |
| |
| err_parse: |
| state->err = reply->rkbuf_err; |
| } |
| |
| rd_kafka_resp_err_t |
| rd_kafka_list_groups (rd_kafka_t *rk, const char *group, |
| const struct rd_kafka_group_list **grplistp, |
| int timeout_ms) { |
| rd_kafka_broker_t *rkb; |
| int rkb_cnt = 0; |
| struct list_groups_state state = RD_ZERO_INIT; |
| rd_ts_t ts_end = rd_timeout_init(timeout_ms); |
| int state_version = rd_kafka_brokers_get_state_version(rk); |
| |
| /* Wait until metadata has been fetched from cluster so |
| * that we have a full broker list. |
| * This state only happens during initial client setup, after that |
| * there'll always be a cached metadata copy. */ |
| rd_kafka_rdlock(rk); |
| while (!rk->rk_ts_metadata) { |
| rd_kafka_rdunlock(rk); |
| |
| if (!rd_kafka_brokers_wait_state_change( |
| rk, state_version, rd_timeout_remains(ts_end))) |
| return RD_KAFKA_RESP_ERR__TIMED_OUT; |
| |
| rd_kafka_rdlock(rk); |
| } |
| |
| state.q = rd_kafka_q_new(rk); |
| state.desired_group = group; |
| state.grplist = rd_calloc(1, sizeof(*state.grplist)); |
| state.grplist_size = group ? 1 : 32; |
| |
| state.grplist->groups = rd_malloc(state.grplist_size * |
| sizeof(*state.grplist->groups)); |
| |
| /* Query each broker for its list of groups */ |
| TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
| rd_kafka_broker_lock(rkb); |
| if (rkb->rkb_nodeid == -1) { |
| rd_kafka_broker_unlock(rkb); |
| continue; |
| } |
| |
| state.wait_cnt++; |
| rd_kafka_ListGroupsRequest(rkb, |
| RD_KAFKA_REPLYQ(state.q, 0), |
| rd_kafka_ListGroups_resp_cb, |
| &state); |
| |
| rkb_cnt++; |
| |
| rd_kafka_broker_unlock(rkb); |
| |
| } |
| rd_kafka_rdunlock(rk); |
| |
| if (rkb_cnt == 0) { |
| state.err = RD_KAFKA_RESP_ERR__TRANSPORT; |
| |
| } else { |
| while (state.wait_cnt > 0) { |
| rd_kafka_q_serve(state.q, 100, 0, |
| RD_KAFKA_Q_CB_CALLBACK, |
| rd_kafka_poll_cb, NULL); |
| /* Ignore yields */ |
| } |
| } |
| |
| rd_kafka_q_destroy(state.q); |
| |
| if (state.err) |
| rd_kafka_group_list_destroy(state.grplist); |
| else |
| *grplistp = state.grplist; |
| |
| return state.err; |
| } |
| |
| |
| void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist0) { |
| struct rd_kafka_group_list *grplist = |
| (struct rd_kafka_group_list *)grplist0; |
| |
| while (grplist->group_cnt-- > 0) { |
| struct rd_kafka_group_info *gi; |
| gi = &grplist->groups[grplist->group_cnt]; |
| |
| if (gi->broker.host) |
| rd_free(gi->broker.host); |
| if (gi->group) |
| rd_free(gi->group); |
| if (gi->state) |
| rd_free(gi->state); |
| if (gi->protocol_type) |
| rd_free(gi->protocol_type); |
| if (gi->protocol) |
| rd_free(gi->protocol); |
| |
| while (gi->member_cnt-- > 0) { |
| struct rd_kafka_group_member_info *mi; |
| mi = &gi->members[gi->member_cnt]; |
| |
| if (mi->member_id) |
| rd_free(mi->member_id); |
| if (mi->client_id) |
| rd_free(mi->client_id); |
| if (mi->client_host) |
| rd_free(mi->client_host); |
| if (mi->member_metadata) |
| rd_free(mi->member_metadata); |
| if (mi->member_assignment) |
| rd_free(mi->member_assignment); |
| } |
| |
| if (gi->members) |
| rd_free(gi->members); |
| } |
| |
| if (grplist->groups) |
| rd_free(grplist->groups); |
| |
| rd_free(grplist); |
| } |
| |
| |
| |
| const char *rd_kafka_get_debug_contexts(void) { |
| return RD_KAFKA_DEBUG_CONTEXTS; |
| } |
| |
| |
| int rd_kafka_path_is_dir (const char *path) { |
| #ifdef _MSC_VER |
| struct _stat st; |
| return (_stat(path, &st) == 0 && st.st_mode & S_IFDIR); |
| #else |
| struct stat st; |
| return (stat(path, &st) == 0 && S_ISDIR(st.st_mode)); |
| #endif |
| } |
| |
| |
| void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr) { |
| free(ptr); |
| } |
| |
| |
| int rd_kafka_errno (void) { |
| return errno; |
| } |
| |
| int rd_kafka_unittest (void) { |
| return rd_unittest(); |
| } |
| |
| |
| #if ENABLE_SHAREDPTR_DEBUG |
| struct rd_shptr0_head rd_shared_ptr_debug_list; |
| mtx_t rd_shared_ptr_debug_mtx; |
| |
| void rd_shared_ptrs_dump (void) { |
| rd_shptr0_t *sptr; |
| |
| printf("################ Current shared pointers ################\n"); |
| printf("### op_cnt: %d\n", rd_atomic32_get(&rd_kafka_op_cnt)); |
| mtx_lock(&rd_shared_ptr_debug_mtx); |
| LIST_FOREACH(sptr, &rd_shared_ptr_debug_list, link) |
| printf("# shptr ((%s*)%p): object %p refcnt %d: at %s:%d\n", |
| sptr->typename, sptr, sptr->obj, |
| rd_refcnt_get(sptr->ref), sptr->func, sptr->line); |
| mtx_unlock(&rd_shared_ptr_debug_mtx); |
| printf("#########################################################\n"); |
| } |
| #endif |