blob: a0b77e1f92c39137efc4e9f4a1e770c11a211e2e [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include "rdavl.h"
struct rd_kafka_metadata *
rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request, rd_kafka_buf_t *rkbuf);
struct rd_kafka_metadata *
rd_kafka_metadata_copy (const struct rd_kafka_metadata *md, size_t size);
size_t
rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos,
const rd_kafka_topic_partition_list_t *match);
size_t
rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos,
const rd_kafka_topic_partition_list_t *match);
void rd_kafka_metadata_log (rd_kafka_t *rk, const char *fac,
const struct rd_kafka_metadata *md);
rd_kafka_resp_err_t
rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
const rd_list_t *topics, int force,
const char *reason);
rd_kafka_resp_err_t
rd_kafka_metadata_refresh_known_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
int force, const char *reason);
rd_kafka_resp_err_t
rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
const char *reason);
rd_kafka_resp_err_t
rd_kafka_metadata_refresh_all (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
const char *reason);
rd_kafka_resp_err_t
rd_kafka_metadata_request (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
const rd_list_t *topics,
const char *reason, rd_kafka_op_t *rko);
/**
* @{
*
* @brief Metadata cache
*/
struct rd_kafka_metadata_cache_entry {
rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */
rd_ts_t rkmce_ts_expires; /* Expire time */
rd_ts_t rkmce_ts_insert; /* Insert time */
rd_kafka_metadata_topic_t rkmce_mtopic; /* Cached topic metadata */
/* rkmce_partitions memory points here. */
};
#define RD_KAFKA_METADATA_CACHE_VALID(rkmce) \
((rkmce)->rkmce_mtopic.err != RD_KAFKA_RESP_ERR__WAIT_CACHE)
struct rd_kafka_metadata_cache {
rd_avl_t rkmc_avl;
TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry;
rd_kafka_timer_t rkmc_expiry_tmr;
int rkmc_cnt;
/* Protected by full_lock: */
mtx_t rkmc_full_lock;
int rkmc_full_topics_sent; /* Full MetadataRequest for
* all topics has been sent,
* awaiting response. */
int rkmc_full_brokers_sent; /* Full MetadataRequest for
* all brokers (but not topics)
* has been sent,
* awaiting response. */
rd_kafka_timer_t rkmc_query_tmr; /* Query timer for topic's without
* leaders. */
cnd_t rkmc_cnd; /* cache_wait_change() cond. */
mtx_t rkmc_cnd_lock; /* lock for rkmc_cnd */
};
void rd_kafka_metadata_cache_expiry_start (rd_kafka_t *rk);
void
rd_kafka_metadata_cache_topic_update (rd_kafka_t *rk,
const rd_kafka_metadata_topic_t *mdt);
void rd_kafka_metadata_cache_update (rd_kafka_t *rk,
const rd_kafka_metadata_t *md,
int abs_update);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find (rd_kafka_t *rk, const char *topic, int valid);
void rd_kafka_metadata_cache_purge_hints (rd_kafka_t *rk,
const rd_list_t *topics);
int rd_kafka_metadata_cache_hint (rd_kafka_t *rk,
const rd_list_t *topics, rd_list_t *dst,
int replace);
int rd_kafka_metadata_cache_hint_rktparlist (
rd_kafka_t *rk,
const rd_kafka_topic_partition_list_t *rktparlist,
rd_list_t *dst,
int replace);
const rd_kafka_metadata_topic_t *
rd_kafka_metadata_cache_topic_get (rd_kafka_t *rk, const char *topic,
int valid);
int rd_kafka_metadata_cache_topic_partition_get (
rd_kafka_t *rk,
const rd_kafka_metadata_topic_t **mtopicp,
const rd_kafka_metadata_partition_t **mpartp,
const char *topic, int32_t partition, int valid);
int rd_kafka_metadata_cache_topics_count_exists (rd_kafka_t *rk,
const rd_list_t *topics,
int *metadata_agep);
int rd_kafka_metadata_cache_topics_filter_hinted (rd_kafka_t *rk,
rd_list_t *dst,
const rd_list_t *src);
void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk);
void rd_kafka_metadata_cache_init (rd_kafka_t *rk);
void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk);
int rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms);
void rd_kafka_metadata_cache_dump (FILE *fp, rd_kafka_t *rk);
/**@}*/