| /* |
| * librdkafka - Apache Kafka C library |
| * |
| * Copyright (c) 2012,2013 Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include "rdkafka_int.h" |
| #include "rd.h" |
| |
| #include <stdlib.h> |
| #include <ctype.h> |
| #include <stddef.h> |
| |
| #include "rdkafka_int.h" |
| #include "rdkafka_feature.h" |
| #include "rdkafka_interceptor.h" |
| #if WITH_PLUGINS |
| #include "rdkafka_plugin.h" |
| #endif |
| |
| struct rd_kafka_property { |
| rd_kafka_conf_scope_t scope; |
| const char *name; |
| enum { |
| _RK_C_STR, |
| _RK_C_INT, |
| _RK_C_S2I, /* String to Integer mapping. |
| * Supports limited canonical str->int mappings |
| * using s2i[] */ |
| _RK_C_S2F, /* CSV String to Integer flag mapping (OR:ed) */ |
| _RK_C_BOOL, |
| _RK_C_PTR, /* Only settable through special set functions */ |
| _RK_C_PATLIST, /* Pattern list */ |
| _RK_C_KSTR, /* Kafka string */ |
| _RK_C_ALIAS, /* Alias: points to other property through .sdef */ |
| _RK_C_INTERNAL, /* Internal, don't expose to application */ |
| _RK_C_INVALID, /* Invalid property, used to catch known |
| * but unsupported Java properties. */ |
| } type; |
| int offset; |
| const char *desc; |
| int vmin; |
| int vmax; |
| int vdef; /* Default value (int) */ |
| const char *sdef; /* Default value (string) */ |
| void *pdef; /* Default value (pointer) */ |
| struct { |
| int val; |
| const char *str; |
| } s2i[16]; /* _RK_C_S2I and _RK_C_S2F */ |
| |
| /* Value validator (STR) */ |
| int (*validate) (const struct rd_kafka_property *prop, |
| const char *val, int ival); |
| |
| /* Configuration object constructors and destructor for use when |
| * the property value itself is not used, or needs extra care. */ |
| void (*ctor) (int scope, void *pconf); |
| void (*dtor) (int scope, void *pconf); |
| void (*copy) (int scope, void *pdst, const void *psrc, |
| void *dstptr, const void *srcptr, |
| size_t filter_cnt, const char **filter); |
| |
| rd_kafka_conf_res_t (*set) (int scope, void *pconf, |
| const char *name, const char *value, |
| void *dstptr, |
| rd_kafka_conf_set_mode_t set_mode, |
| char *errstr, size_t errstr_size); |
| }; |
| |
| |
| #define _RK(field) offsetof(rd_kafka_conf_t, field) |
| #define _RKT(field) offsetof(rd_kafka_topic_conf_t, field) |
| |
| |
| static rd_kafka_conf_res_t |
| rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop, |
| char *dest, size_t *dest_size); |
| |
| |
| /** |
| * @brief Validate \p broker.version.fallback property. |
| */ |
| static int |
| rd_kafka_conf_validate_broker_version (const struct rd_kafka_property *prop, |
| const char *val, int ival) { |
| struct rd_kafka_ApiVersion *apis; |
| size_t api_cnt; |
| return rd_kafka_get_legacy_ApiVersions(val, &apis, &api_cnt, NULL); |
| } |
| |
| /** |
| * @brief Validate that string is a single item, without delimters (, space). |
| */ |
| static RD_UNUSED int |
| rd_kafka_conf_validate_single (const struct rd_kafka_property *prop, |
| const char *val, int ival) { |
| return !strchr(val, ',') && !strchr(val, ' '); |
| } |
| |
| |
| /** |
| * librdkafka configuration property definitions. |
| */ |
| static const struct rd_kafka_property rd_kafka_properties[] = { |
| /* Global properties */ |
| { _RK_GLOBAL, "builtin.features", _RK_C_S2F, _RK(builtin_features), |
| "Indicates the builtin features for this build of librdkafka. " |
| "An application can either query this value or attempt to set it " |
| "with its list of required features to check for library support.", |
| 0, 0x7fffffff, 0xffff, |
| .s2i = { |
| #if WITH_ZLIB |
| { 0x1, "gzip" }, |
| #endif |
| #if WITH_SNAPPY |
| { 0x2, "snappy" }, |
| #endif |
| #if WITH_SSL |
| { 0x4, "ssl" }, |
| #endif |
| { 0x8, "sasl" }, |
| { 0x10, "regex" }, |
| { 0x20, "lz4" }, |
| #if defined(_MSC_VER) || WITH_SASL_CYRUS |
| { 0x40, "sasl_gssapi" }, |
| #endif |
| { 0x80, "sasl_plain" }, |
| #if WITH_SASL_SCRAM |
| { 0x100, "sasl_scram" }, |
| #endif |
| #if WITH_PLUGINS |
| { 0x200, "plugins" }, |
| #endif |
| { 0, NULL } |
| } |
| }, |
| { _RK_GLOBAL, "client.id", _RK_C_STR, _RK(client_id_str), |
| "Client identifier.", |
| .sdef = "rdkafka" }, |
| { _RK_GLOBAL, "metadata.broker.list", _RK_C_STR, _RK(brokerlist), |
| "Initial list of brokers as a CSV list of broker host or host:port. " |
| "The application may also use `rd_kafka_brokers_add()` to add " |
| "brokers during runtime." }, |
| { _RK_GLOBAL, "bootstrap.servers", _RK_C_ALIAS, 0, |
| "See metadata.broker.list", |
| .sdef = "metadata.broker.list" }, |
| { _RK_GLOBAL, "message.max.bytes", _RK_C_INT, _RK(max_msg_size), |
| "Maximum Kafka protocol request message size.", |
| 1000, 1000000000, 1000000 }, |
| { _RK_GLOBAL, "message.copy.max.bytes", _RK_C_INT, |
| _RK(msg_copy_max_size), |
| "Maximum size for message to be copied to buffer. " |
| "Messages larger than this will be passed by reference (zero-copy) " |
| "at the expense of larger iovecs.", |
| 0, 1000000000, 0xffff }, |
| { _RK_GLOBAL, "receive.message.max.bytes", _RK_C_INT, |
| _RK(recv_max_msg_size), |
| "Maximum Kafka protocol response message size. " |
| "This is a safety precaution to avoid memory exhaustion in case of " |
| "protocol hickups. " |
| "The value should be at least fetch.message.max.bytes * number of " |
| "partitions consumed from + messaging overhead (e.g. 200000 bytes).", |
| 1000, 1000000000, 100000000 }, |
| { _RK_GLOBAL, "max.in.flight.requests.per.connection", _RK_C_INT, |
| _RK(max_inflight), |
| "Maximum number of in-flight requests per broker connection. " |
| "This is a generic property applied to all broker communication, " |
| "however it is primarily relevant to produce requests. " |
| "In particular, note that other mechanisms limit the number " |
| "of outstanding consumer fetch request per broker to one.", |
| 1, 1000000, 1000000 }, |
| { _RK_GLOBAL, "max.in.flight", _RK_C_ALIAS, |
| .sdef = "max.in.flight.requests.per.connection" }, |
| { _RK_GLOBAL, "metadata.request.timeout.ms", _RK_C_INT, |
| _RK(metadata_request_timeout_ms), |
| "Non-topic request timeout in milliseconds. " |
| "This is for metadata requests, etc.", |
| 10, 900*1000, 60*1000}, |
| { _RK_GLOBAL, "topic.metadata.refresh.interval.ms", _RK_C_INT, |
| _RK(metadata_refresh_interval_ms), |
| "Topic metadata refresh interval in milliseconds. " |
| "The metadata is automatically refreshed on error and connect. " |
| "Use -1 to disable the intervalled refresh.", |
| -1, 3600*1000, 5*60*1000 }, |
| { _RK_GLOBAL, "metadata.max.age.ms", _RK_C_INT, |
| _RK(metadata_max_age_ms), |
| "Metadata cache max age. " |
| "Defaults to metadata.refresh.interval.ms * 3", |
| 1, 24*3600*1000, -1 }, |
| { _RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms", _RK_C_INT, |
| _RK(metadata_refresh_fast_interval_ms), |
| "When a topic loses its leader a new metadata request will be " |
| "enqueued with this initial interval, exponentially increasing " |
| "until the topic metadata has been refreshed. " |
| "This is used to recover quickly from transitioning leader brokers.", |
| 1, 60*1000, 250 }, |
| { _RK_GLOBAL, "topic.metadata.refresh.fast.cnt", _RK_C_INT, |
| _RK(metadata_refresh_fast_cnt), |
| "*Deprecated: No longer used.*", |
| 0, 1000, 10 }, |
| { _RK_GLOBAL, "topic.metadata.refresh.sparse", _RK_C_BOOL, |
| _RK(metadata_refresh_sparse), |
| "Sparse metadata requests (consumes less network bandwidth)", |
| 0, 1, 1 }, |
| { _RK_GLOBAL, "topic.blacklist", _RK_C_PATLIST, |
| _RK(topic_blacklist), |
| "Topic blacklist, a comma-separated list of regular expressions " |
| "for matching topic names that should be ignored in " |
| "broker metadata information as if the topics did not exist." }, |
| { _RK_GLOBAL, "debug", _RK_C_S2F, _RK(debug), |
| "A comma-separated list of debug contexts to enable. " |
| "Debugging the Producer: broker,topic,msg. Consumer: cgrp,topic,fetch", |
| .s2i = { |
| { RD_KAFKA_DBG_GENERIC, "generic" }, |
| { RD_KAFKA_DBG_BROKER, "broker" }, |
| { RD_KAFKA_DBG_TOPIC, "topic" }, |
| { RD_KAFKA_DBG_METADATA, "metadata" }, |
| { RD_KAFKA_DBG_QUEUE, "queue" }, |
| { RD_KAFKA_DBG_MSG, "msg" }, |
| { RD_KAFKA_DBG_PROTOCOL, "protocol" }, |
| { RD_KAFKA_DBG_CGRP, "cgrp" }, |
| { RD_KAFKA_DBG_SECURITY, "security" }, |
| { RD_KAFKA_DBG_FETCH, "fetch" }, |
| { RD_KAFKA_DBG_FEATURE, "feature" }, |
| { RD_KAFKA_DBG_INTERCEPTOR, "interceptor" }, |
| { RD_KAFKA_DBG_PLUGIN, "plugin" }, |
| { RD_KAFKA_DBG_ALL, "all" }, |
| } }, |
| { _RK_GLOBAL, "socket.timeout.ms", _RK_C_INT, _RK(socket_timeout_ms), |
| "Timeout for network requests.", |
| 10, 300*1000, 60*1000 }, |
| { _RK_GLOBAL, "socket.blocking.max.ms", _RK_C_INT, |
| _RK(socket_blocking_max_ms), |
| "Maximum time a broker socket operation may block. " |
| "A lower value improves responsiveness at the expense of " |
| "slightly higher CPU usage. **Deprecated**", |
| 1, 60*1000, 1000 }, |
| { _RK_GLOBAL, "socket.send.buffer.bytes", _RK_C_INT, |
| _RK(socket_sndbuf_size), |
| "Broker socket send buffer size. System default is used if 0.", |
| 0, 100000000, 0 }, |
| { _RK_GLOBAL, "socket.receive.buffer.bytes", _RK_C_INT, |
| _RK(socket_rcvbuf_size), |
| "Broker socket receive buffer size. System default is used if 0.", |
| 0, 100000000, 0 }, |
| { _RK_GLOBAL, "socket.keepalive.enable", _RK_C_BOOL, |
| _RK(socket_keepalive), |
| "Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets", |
| 0, 1, 0 }, |
| { _RK_GLOBAL, "socket.nagle.disable", _RK_C_BOOL, |
| _RK(socket_nagle_disable), |
| "Disable the Nagle algorithm (TCP_NODELAY).", |
| 0, 1, 0 }, |
| { _RK_GLOBAL, "socket.max.fails", _RK_C_INT, |
| _RK(socket_max_fails), |
| "Disconnect from broker when this number of send failures " |
| "(e.g., timed out requests) is reached. Disable with 0. " |
| "NOTE: The connection is automatically re-established.", |
| 0, 1000000, 3 }, |
| { _RK_GLOBAL, "broker.address.ttl", _RK_C_INT, |
| _RK(broker_addr_ttl), |
| "How long to cache the broker address resolving " |
| "results (milliseconds).", |
| 0, 86400*1000, 1*1000 }, |
| { _RK_GLOBAL, "broker.address.family", _RK_C_S2I, |
| _RK(broker_addr_family), |
| "Allowed broker IP address families: any, v4, v6", |
| .vdef = AF_UNSPEC, |
| .s2i = { |
| { AF_UNSPEC, "any" }, |
| { AF_INET, "v4" }, |
| { AF_INET6, "v6" }, |
| } }, |
| { _RK_GLOBAL, "reconnect.backoff.jitter.ms", _RK_C_INT, |
| _RK(reconnect_jitter_ms), |
| "Throttle broker reconnection attempts by this value +-50%.", |
| 0, 60*60*1000, 500 }, |
| { _RK_GLOBAL, "statistics.interval.ms", _RK_C_INT, |
| _RK(stats_interval_ms), |
| "librdkafka statistics emit interval. The application also needs to " |
| "register a stats callback using `rd_kafka_conf_set_stats_cb()`. " |
| "The granularity is 1000ms. A value of 0 disables statistics.", |
| 0, 86400*1000, 0 }, |
| { _RK_GLOBAL, "enabled_events", _RK_C_INT, |
| _RK(enabled_events), |
| "See `rd_kafka_conf_set_events()`", |
| 0, 0x7fffffff, 0 }, |
| { _RK_GLOBAL, "error_cb", _RK_C_PTR, |
| _RK(error_cb), |
| "Error callback (set with rd_kafka_conf_set_error_cb())" }, |
| { _RK_GLOBAL, "throttle_cb", _RK_C_PTR, |
| _RK(throttle_cb), |
| "Throttle callback (set with rd_kafka_conf_set_throttle_cb())" }, |
| { _RK_GLOBAL, "stats_cb", _RK_C_PTR, |
| _RK(stats_cb), |
| "Statistics callback (set with rd_kafka_conf_set_stats_cb())" }, |
| { _RK_GLOBAL, "log_cb", _RK_C_PTR, |
| _RK(log_cb), |
| "Log callback (set with rd_kafka_conf_set_log_cb())", |
| .pdef = rd_kafka_log_print }, |
| { _RK_GLOBAL, "log_level", _RK_C_INT, |
| _RK(log_level), |
| "Logging level (syslog(3) levels)", |
| 0, 7, 6 }, |
| { _RK_GLOBAL, "log.queue", _RK_C_BOOL, _RK(log_queue), |
| "Disable spontaneous log_cb from internal librdkafka " |
| "threads, instead enqueue log messages on queue set with " |
| "`rd_kafka_set_log_queue()` and serve log callbacks or " |
| "events through the standard poll APIs. " |
| "**NOTE**: Log messages will linger in a temporary queue " |
| "until the log queue has been set.", |
| 0, 1, 0 }, |
| { _RK_GLOBAL, "log.thread.name", _RK_C_BOOL, |
| _RK(log_thread_name), |
| "Print internal thread name in log messages " |
| "(useful for debugging librdkafka internals)", |
| 0, 1, 1 }, |
| { _RK_GLOBAL, "log.connection.close", _RK_C_BOOL, |
| _RK(log_connection_close), |
| "Log broker disconnects. " |
| "It might be useful to turn this off when interacting with " |
| "0.9 brokers with an aggressive `connection.max.idle.ms` value.", |
| 0, 1, 1 }, |
| { _RK_GLOBAL, "socket_cb", _RK_C_PTR, |
| _RK(socket_cb), |
| "Socket creation callback to provide race-free CLOEXEC", |
| .pdef = |
| #ifdef __linux__ |
| rd_kafka_socket_cb_linux |
| #else |
| rd_kafka_socket_cb_generic |
| #endif |
| }, |
| { _RK_GLOBAL, "connect_cb", _RK_C_PTR, |
| _RK(connect_cb), |
| "Socket connect callback", |
| }, |
| { _RK_GLOBAL, "closesocket_cb", _RK_C_PTR, |
| _RK(closesocket_cb), |
| "Socket close callback", |
| }, |
| { _RK_GLOBAL, "open_cb", _RK_C_PTR, |
| _RK(open_cb), |
| "File open callback to provide race-free CLOEXEC", |
| .pdef = |
| #ifdef __linux__ |
| rd_kafka_open_cb_linux |
| #else |
| rd_kafka_open_cb_generic |
| #endif |
| }, |
| { _RK_GLOBAL, "opaque", _RK_C_PTR, |
| _RK(opaque), |
| "Application opaque (set with rd_kafka_conf_set_opaque())" }, |
| { _RK_GLOBAL, "default_topic_conf", _RK_C_PTR, |
| _RK(topic_conf), |
| "Default topic configuration for automatically subscribed topics" }, |
| { _RK_GLOBAL, "internal.termination.signal", _RK_C_INT, |
| _RK(term_sig), |
| "Signal that librdkafka will use to quickly terminate on " |
| "rd_kafka_destroy(). If this signal is not set then there will be a " |
| "delay before rd_kafka_wait_destroyed() returns true " |
| "as internal threads are timing out their system calls. " |
| "If this signal is set however the delay will be minimal. " |
| "The application should mask this signal as an internal " |
| "signal handler is installed.", |
| 0, 128, 0 }, |
| { _RK_GLOBAL, "api.version.request", _RK_C_BOOL, |
| _RK(api_version_request), |
| "Request broker's supported API versions to adjust functionality to " |
| "available protocol features. If set to false, or the " |
| "ApiVersionRequest fails, the fallback version " |
| "`broker.version.fallback` will be used. " |
| "**NOTE**: Depends on broker version >=0.10.0. If the request is not " |
| "supported by (an older) broker the `broker.version.fallback` fallback is used.", |
| 0, 1, 1 }, |
| { _RK_GLOBAL, "api.version.request.timeout.ms", _RK_C_INT, |
| _RK(api_version_request_timeout_ms), |
| "Timeout for broker API version requests.", |
| 1, 5*60*1000, 10*1000 }, |
| { _RK_GLOBAL, "api.version.fallback.ms", _RK_C_INT, |
| _RK(api_version_fallback_ms), |
| "Dictates how long the `broker.version.fallback` fallback is used " |
| "in the case the ApiVersionRequest fails. " |
| "**NOTE**: The ApiVersionRequest is only issued when a new connection " |
| "to the broker is made (such as after an upgrade).", |
| 0, 86400*7*1000, 20*60*1000 /* longer than default Idle timeout (10m)*/ }, |
| |
| { _RK_GLOBAL, "broker.version.fallback", _RK_C_STR, |
| _RK(broker_version_fallback), |
| "Older broker versions (<0.10.0) provides no way for a client to query " |
| "for supported protocol features " |
| "(ApiVersionRequest, see `api.version.request`) making it impossible " |
| "for the client to know what features it may use. " |
| "As a workaround a user may set this property to the expected broker " |
| "version and the client will automatically adjust its feature set " |
| "accordingly if the ApiVersionRequest fails (or is disabled). " |
| "The fallback broker version will be used for `api.version.fallback.ms`. " |
| "Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. Any other value, " |
| "such as 0.10.2.1, enables ApiVersionRequests.", |
| .sdef = "0.9.0", |
| .validate = rd_kafka_conf_validate_broker_version }, |
| |
| /* Security related global properties */ |
| { _RK_GLOBAL, "security.protocol", _RK_C_S2I, |
| _RK(security_protocol), |
| "Protocol used to communicate with brokers.", |
| .vdef = RD_KAFKA_PROTO_PLAINTEXT, |
| .s2i = { |
| { RD_KAFKA_PROTO_PLAINTEXT, "plaintext" }, |
| #if WITH_SSL |
| { RD_KAFKA_PROTO_SSL, "ssl" }, |
| #endif |
| { RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext" }, |
| #if WITH_SSL |
| { RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl" }, |
| #endif |
| { 0, NULL } |
| } }, |
| |
| #if WITH_SSL |
| { _RK_GLOBAL, "ssl.cipher.suites", _RK_C_STR, |
| _RK(ssl.cipher_suites), |
| "A cipher suite is a named combination of authentication, " |
| "encryption, MAC and key exchange algorithm used to negotiate the " |
| "security settings for a network connection using TLS or SSL network " |
| "protocol. See manual page for `ciphers(1)` and " |
| "`SSL_CTX_set_cipher_list(3)." |
| }, |
| { _RK_GLOBAL, "ssl.key.location", _RK_C_STR, |
| _RK(ssl.key_location), |
| "Path to client's private key (PEM) used for authentication." |
| }, |
| { _RK_GLOBAL, "ssl.key.password", _RK_C_STR, |
| _RK(ssl.key_password), |
| "Private key passphrase" |
| }, |
| { _RK_GLOBAL, "ssl.certificate.location", _RK_C_STR, |
| _RK(ssl.cert_location), |
| "Path to client's public key (PEM) used for authentication." |
| }, |
| { _RK_GLOBAL, "ssl.ca.location", _RK_C_STR, |
| _RK(ssl.ca_location), |
| "File or directory path to CA certificate(s) for verifying " |
| "the broker's key." |
| }, |
| { _RK_GLOBAL, "ssl.crl.location", _RK_C_STR, |
| _RK(ssl.crl_location), |
| "Path to CRL for verifying broker's certificate validity." |
| }, |
| #endif /* WITH_SSL */ |
| |
| /* Point user in the right direction if they try to apply |
| * Java client SSL / JAAS properties. */ |
| { _RK_GLOBAL, "ssl.keystore.location", _RK_C_INVALID, |
| _RK(dummy), |
| "Java KeyStores are not supported, use `ssl.key.location` and " |
| "a private key (PEM) file instead. " |
| "See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka for more information." |
| }, |
| { _RK_GLOBAL, "ssl.truststore.location", _RK_C_INVALID, |
| _RK(dummy), |
| "Java TrustStores are not supported, use `ssl.ca.location` " |
| "and a certificate file instead. " |
| "See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka for more information." |
| }, |
| { _RK_GLOBAL, "sasl.jaas.config", _RK_C_INVALID, |
| _RK(dummy), |
| "Java JAAS configuration is not supported, see " |
| "https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka " |
| "for more information." |
| }, |
| |
| {_RK_GLOBAL,"sasl.mechanisms", _RK_C_STR, |
| _RK(sasl.mechanisms), |
| "SASL mechanism to use for authentication. " |
| "Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. " |
| "**NOTE**: Despite the name only one mechanism must be configured.", |
| .sdef = "GSSAPI", |
| .validate = rd_kafka_conf_validate_single }, |
| { _RK_GLOBAL, "sasl.kerberos.service.name", _RK_C_STR, |
| _RK(sasl.service_name), |
| "Kerberos principal name that Kafka runs as.", |
| .sdef = "kafka" }, |
| { _RK_GLOBAL, "sasl.kerberos.principal", _RK_C_STR, |
| _RK(sasl.principal), |
| "This client's Kerberos principal name.", |
| .sdef = "kafkaclient" }, |
| #ifndef _MSC_VER |
| { _RK_GLOBAL, "sasl.kerberos.kinit.cmd", _RK_C_STR, |
| _RK(sasl.kinit_cmd), |
| "Full kerberos kinit command string, %{config.prop.name} is replaced " |
| "by corresponding config object value, %{broker.name} returns the " |
| "broker's hostname.", |
| .sdef = "kinit -S \"%{sasl.kerberos.service.name}/%{broker.name}\" " |
| "-k -t \"%{sasl.kerberos.keytab}\" %{sasl.kerberos.principal}" }, |
| { _RK_GLOBAL, "sasl.kerberos.keytab", _RK_C_STR, |
| _RK(sasl.keytab), |
| "Path to Kerberos keytab file. Uses system default if not set." |
| "**NOTE**: This is not automatically used but must be added to the " |
| "template in sasl.kerberos.kinit.cmd as " |
| "` ... -t %{sasl.kerberos.keytab}`." }, |
| { _RK_GLOBAL, "sasl.kerberos.min.time.before.relogin", _RK_C_INT, |
| _RK(sasl.relogin_min_time), |
| "Minimum time in milliseconds between key refresh attempts.", |
| 1, 86400*1000, 60*1000 }, |
| #endif |
| { _RK_GLOBAL, "sasl.username", _RK_C_STR, |
| _RK(sasl.username), |
| "SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms" }, |
| { _RK_GLOBAL, "sasl.password", _RK_C_STR, |
| _RK(sasl.password), |
| "SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism" }, |
| |
| #if WITH_PLUGINS |
| /* Plugins */ |
| { _RK_GLOBAL, "plugin.library.paths", _RK_C_STR, |
| _RK(plugin_paths), |
| "List of plugin libaries to load (; separated). " |
| "The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the " |
| "platform-specific extension (such as .dll or .so) will be appended automatically.", |
| .set = rd_kafka_plugins_conf_set }, |
| #endif |
| |
| /* Interceptors are added through specific API and not exposed |
| * as configuration properties. |
| * The interceptor property must be defined after plugin.library.paths |
| * so that the plugin libraries are properly loaded before |
| * interceptors are configured when duplicating configuration objects.*/ |
| { _RK_GLOBAL, "interceptors", _RK_C_INTERNAL, |
| _RK(interceptors), |
| "Interceptors added through rd_kafka_conf_interceptor_add_..() " |
| "and any configuration handled by interceptors.", |
| .ctor = rd_kafka_conf_interceptor_ctor, |
| .dtor = rd_kafka_conf_interceptor_dtor, |
| .copy = rd_kafka_conf_interceptor_copy }, |
| |
| /* Global client group properties */ |
| { _RK_GLOBAL|_RK_CGRP, "group.id", _RK_C_STR, |
| _RK(group_id_str), |
| "Client group id string. All clients sharing the same group.id " |
| "belong to the same group." }, |
| { _RK_GLOBAL|_RK_CGRP, "partition.assignment.strategy", _RK_C_STR, |
| _RK(partition_assignment_strategy), |
| "Name of partition assignment strategy to use when elected " |
| "group leader assigns partitions to group members.", |
| .sdef = "range,roundrobin" }, |
| { _RK_GLOBAL|_RK_CGRP, "session.timeout.ms", _RK_C_INT, |
| _RK(group_session_timeout_ms), |
| "Client group session and failure detection timeout.", |
| 1, 3600*1000, 30*1000 }, |
| { _RK_GLOBAL|_RK_CGRP, "heartbeat.interval.ms", _RK_C_INT, |
| _RK(group_heartbeat_intvl_ms), |
| "Group session keepalive heartbeat interval.", |
| 1, 3600*1000, 1*1000 }, |
| { _RK_GLOBAL|_RK_CGRP, "group.protocol.type", _RK_C_KSTR, |
| _RK(group_protocol_type), |
| "Group protocol type", |
| .sdef = "consumer" }, |
| { _RK_GLOBAL|_RK_CGRP, "coordinator.query.interval.ms", _RK_C_INT, |
| _RK(coord_query_intvl_ms), |
| "How often to query for the current client group coordinator. " |
| "If the currently assigned coordinator is down the configured " |
| "query interval will be divided by ten to more quickly recover " |
| "in case of coordinator reassignment.", |
| 1, 3600*1000, 10*60*1000 }, |
| |
| /* Global consumer properties */ |
| { _RK_GLOBAL|_RK_CONSUMER, "enable.auto.commit", _RK_C_BOOL, |
| _RK(enable_auto_commit), |
| "Automatically and periodically commit offsets in the background.", |
| 0, 1, 1 }, |
| { _RK_GLOBAL|_RK_CONSUMER, "auto.commit.interval.ms", _RK_C_INT, |
| _RK(auto_commit_interval_ms), |
| "The frequency in milliseconds that the consumer offsets " |
| "are committed (written) to offset storage. (0 = disable). " |
| "This setting is used by the high-level consumer.", |
| 0, 86400*1000, 5*1000 }, |
| { _RK_GLOBAL|_RK_CONSUMER, "enable.auto.offset.store", _RK_C_BOOL, |
| _RK(enable_auto_offset_store), |
| "Automatically store offset of last message provided to " |
| "application.", |
| 0, 1, 1 }, |
| { _RK_GLOBAL|_RK_CONSUMER, "queued.min.messages", _RK_C_INT, |
| _RK(queued_min_msgs), |
| "Minimum number of messages per topic+partition " |
| "librdkafka tries to maintain in the local consumer queue.", |
| 1, 10000000, 100000 }, |
| { _RK_GLOBAL|_RK_CONSUMER, "queued.max.messages.kbytes", _RK_C_INT, |
| _RK(queued_max_msg_kbytes), |
| "Maximum number of kilobytes per topic+partition in the " |
| "local consumer queue. " |
| "This value may be overshot by fetch.message.max.bytes. " |
| "This property has higher priority than queued.min.messages.", |
| 1, 1000000000, 1000000 /* 1 Gig */ }, |
| { _RK_GLOBAL|_RK_CONSUMER, "fetch.wait.max.ms", _RK_C_INT, |
| _RK(fetch_wait_max_ms), |
| "Maximum time the broker may wait to fill the response " |
| "with fetch.min.bytes.", |
| 0, 300*1000, 100 }, |
| { _RK_GLOBAL|_RK_CONSUMER, "fetch.message.max.bytes", _RK_C_INT, |
| _RK(fetch_msg_max_bytes), |
| "Initial maximum number of bytes per topic+partition to request when " |
| "fetching messages from the broker. " |
| "If the client encounters a message larger than this value " |
| "it will gradually try to increase it until the " |
| "entire message can be fetched.", |
| 1, 1000000000, 1024*1024 }, |
| { _RK_GLOBAL|_RK_CONSUMER, "max.partition.fetch.bytes", _RK_C_ALIAS, |
| .sdef = "fetch.message.max.bytes" }, |
| { _RK_GLOBAL|_RK_CONSUMER, "fetch.min.bytes", _RK_C_INT, |
| _RK(fetch_min_bytes), |
| "Minimum number of bytes the broker responds with. " |
| "If fetch.wait.max.ms expires the accumulated data will " |
| "be sent to the client regardless of this setting.", |
| 1, 100000000, 1 }, |
| { _RK_GLOBAL|_RK_CONSUMER, "fetch.error.backoff.ms", _RK_C_INT, |
| _RK(fetch_error_backoff_ms), |
| "How long to postpone the next fetch request for a " |
| "topic+partition in case of a fetch error.", |
| 0, 300*1000, 500 }, |
| { _RK_GLOBAL|_RK_CONSUMER, "offset.store.method", _RK_C_S2I, |
| _RK(offset_store_method), |
| "Offset commit store method: " |
| "'file' - local file store (offset.store.path, et.al), " |
| "'broker' - broker commit store " |
| "(requires Apache Kafka 0.8.2 or later on the broker).", |
| .vdef = RD_KAFKA_OFFSET_METHOD_BROKER, |
| .s2i = { |
| { RD_KAFKA_OFFSET_METHOD_NONE, "none" }, |
| { RD_KAFKA_OFFSET_METHOD_FILE, "file" }, |
| { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" } |
| } |
| }, |
| { _RK_GLOBAL|_RK_CONSUMER, "consume_cb", _RK_C_PTR, |
| _RK(consume_cb), |
| "Message consume callback (set with rd_kafka_conf_set_consume_cb())"}, |
| { _RK_GLOBAL|_RK_CONSUMER, "rebalance_cb", _RK_C_PTR, |
| _RK(rebalance_cb), |
| "Called after consumer group has been rebalanced " |
| "(set with rd_kafka_conf_set_rebalance_cb())" }, |
| { _RK_GLOBAL|_RK_CONSUMER, "offset_commit_cb", _RK_C_PTR, |
| _RK(offset_commit_cb), |
| "Offset commit result propagation callback. " |
| "(set with rd_kafka_conf_set_offset_commit_cb())" }, |
| { _RK_GLOBAL|_RK_CONSUMER, "enable.partition.eof", _RK_C_BOOL, |
| _RK(enable_partition_eof), |
| "Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the " |
| "consumer reaches the end of a partition.", |
| 0, 1, 1 }, |
| { _RK_GLOBAL|_RK_CONSUMER, "check.crcs", _RK_C_BOOL, |
| _RK(check_crcs), |
| "Verify CRC32 of consumed messages, ensuring no on-the-wire or " |
| "on-disk corruption to the messages occurred. This check comes " |
| "at slightly increased CPU usage.", |
| 0, 1, 0 }, |
| /* Global producer properties */ |
| { _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.max.messages", _RK_C_INT, |
| _RK(queue_buffering_max_msgs), |
| "Maximum number of messages allowed on the producer queue.", |
| 1, 10000000, 100000 }, |
| { _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.max.kbytes", _RK_C_INT, |
| _RK(queue_buffering_max_kbytes), |
| "Maximum total message size sum allowed on the producer queue. " |
| "This property has higher priority than queue.buffering.max.messages.", |
| 1, INT_MAX/1024, 4000000 }, |
| { _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.max.ms", _RK_C_INT, |
| _RK(buffering_max_ms), |
| "Delay in milliseconds to wait for messages in the producer queue " |
| "to accumulate before constructing message batches (MessageSets) to " |
| "transmit to brokers. " |
| "A higher value allows larger and more effective " |
| "(less overhead, improved compression) batches of messages to " |
| "accumulate at the expense of increased message delivery latency.", |
| 0, 900*1000, 0 }, |
| { _RK_GLOBAL|_RK_PRODUCER, "linger.ms", _RK_C_ALIAS, |
| .sdef = "queue.buffering.max.ms" }, |
| { _RK_GLOBAL|_RK_PRODUCER, "message.send.max.retries", _RK_C_INT, |
| _RK(max_retries), |
| "How many times to retry sending a failing MessageSet. " |
| "**Note:** retrying may cause reordering.", |
| 0, 10000000, 2 }, |
| { _RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS, |
| .sdef = "message.send.max.retries" }, |
| { _RK_GLOBAL|_RK_PRODUCER, "retry.backoff.ms", _RK_C_INT, |
| _RK(retry_backoff_ms), |
| "The backoff time in milliseconds before retrying a message send.", |
| 1, 300*1000, 100 }, |
| { _RK_GLOBAL|_RK_PRODUCER, "compression.codec", _RK_C_S2I, |
| _RK(compression_codec), |
| "compression codec to use for compressing message sets. " |
| "This is the default value for all topics, may be overriden by " |
| "the topic configuration property `compression.codec`. ", |
| .vdef = RD_KAFKA_COMPRESSION_NONE, |
| .s2i = { |
| { RD_KAFKA_COMPRESSION_NONE, "none" }, |
| #if WITH_ZLIB |
| { RD_KAFKA_COMPRESSION_GZIP, "gzip" }, |
| #endif |
| #if WITH_SNAPPY |
| { RD_KAFKA_COMPRESSION_SNAPPY, "snappy" }, |
| #endif |
| { RD_KAFKA_COMPRESSION_LZ4, "lz4" }, |
| { 0 } |
| } }, |
| { _RK_GLOBAL|_RK_PRODUCER, "batch.num.messages", _RK_C_INT, |
| _RK(batch_num_messages), |
| "Maximum number of messages batched in one MessageSet. " |
| "The total MessageSet size is also limited by message.max.bytes.", |
| 1, 1000000, 10000 }, |
| { _RK_GLOBAL|_RK_PRODUCER, "delivery.report.only.error", _RK_C_BOOL, |
| _RK(dr_err_only), |
| "Only provide delivery reports for failed messages.", |
| 0, 1, 0 }, |
| { _RK_GLOBAL|_RK_PRODUCER, "dr_cb", _RK_C_PTR, |
| _RK(dr_cb), |
| "Delivery report callback (set with rd_kafka_conf_set_dr_cb())" }, |
| { _RK_GLOBAL|_RK_PRODUCER, "dr_msg_cb", _RK_C_PTR, |
| _RK(dr_msg_cb), |
| "Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())" }, |
| |
| |
| /* |
| * Topic properties |
| */ |
| |
| /* Topic producer properties */ |
| { _RK_TOPIC|_RK_PRODUCER, "request.required.acks", _RK_C_INT, |
| _RKT(required_acks), |
| "This field indicates how many acknowledgements the leader broker " |
| "must receive from ISR brokers before responding to the request: " |
| "*0*=Broker does not send any response/ack to client, " |
| "*1*=Only the leader broker will need to ack the message, " |
| "*-1* or *all*=broker will block until message is committed by all " |
| "in sync replicas (ISRs) or broker's `in.sync.replicas` " |
| "setting before sending response. ", |
| -1, 1000, 1, |
| .s2i = { |
| { -1, "all" }, |
| } |
| }, |
| { _RK_TOPIC | _RK_PRODUCER, "acks", _RK_C_ALIAS, |
| .sdef = "request.required.acks" }, |
| |
| { _RK_TOPIC|_RK_PRODUCER, "request.timeout.ms", _RK_C_INT, |
| _RKT(request_timeout_ms), |
| "The ack timeout of the producer request in milliseconds. " |
| "This value is only enforced by the broker and relies " |
| "on `request.required.acks` being != 0.", |
| 1, 900*1000, 5*1000 }, |
| { _RK_TOPIC|_RK_PRODUCER, "message.timeout.ms", _RK_C_INT, |
| _RKT(message_timeout_ms), |
| "Local message timeout. " |
| "This value is only enforced locally and limits the time a " |
| "produced message waits for successful delivery. " |
| "A time of 0 is infinite.", |
| 0, 900*1000, 300*1000 }, |
| { _RK_TOPIC|_RK_PRODUCER, "produce.offset.report", _RK_C_BOOL, |
| _RKT(produce_offset_report), |
| "Report offset of produced message back to application. " |
| "The application must be use the `dr_msg_cb` to retrieve the offset " |
| "from `rd_kafka_message_t.offset`.", |
| 0, 1, 0 }, |
| { _RK_TOPIC|_RK_PRODUCER, "partitioner_cb", _RK_C_PTR, |
| _RKT(partitioner), |
| "Partitioner callback " |
| "(set with rd_kafka_topic_conf_set_partitioner_cb())" }, |
| { _RK_TOPIC, "opaque", _RK_C_PTR, |
| _RKT(opaque), |
| "Application opaque (set with rd_kafka_topic_conf_set_opaque())" }, |
| { _RK_TOPIC | _RK_PRODUCER, "compression.codec", _RK_C_S2I, |
| _RKT(compression_codec), |
| "Compression codec to use for compressing message sets. ", |
| .vdef = RD_KAFKA_COMPRESSION_INHERIT, |
| .s2i = { |
| { RD_KAFKA_COMPRESSION_NONE, "none" }, |
| #if WITH_ZLIB |
| { RD_KAFKA_COMPRESSION_GZIP, "gzip" }, |
| #endif |
| #if WITH_SNAPPY |
| { RD_KAFKA_COMPRESSION_SNAPPY, "snappy" }, |
| #endif |
| { RD_KAFKA_COMPRESSION_LZ4, "lz4" }, |
| { RD_KAFKA_COMPRESSION_INHERIT, "inherit" }, |
| { 0 } |
| } }, |
| |
| |
| /* Topic consumer properties */ |
| { _RK_TOPIC|_RK_CONSUMER, "auto.commit.enable", _RK_C_BOOL, |
| _RKT(auto_commit), |
| "If true, periodically commit offset of the last message handed " |
| "to the application. This committed offset will be used when the " |
| "process restarts to pick up where it left off. " |
| "If false, the application will have to call " |
| "`rd_kafka_offset_store()` to store an offset (optional). " |
| "**NOTE:** This property should only be used with the simple " |
| "legacy consumer, when using the high-level KafkaConsumer the global " |
| "`enable.auto.commit` property must be used instead. " |
| "**NOTE:** There is currently no zookeeper integration, offsets " |
| "will be written to broker or local file according to " |
| "offset.store.method.", |
| 0, 1, 1 }, |
| { _RK_TOPIC|_RK_CONSUMER, "enable.auto.commit", _RK_C_ALIAS, |
| .sdef = "auto.commit.enable" }, |
| { _RK_TOPIC|_RK_CONSUMER, "auto.commit.interval.ms", _RK_C_INT, |
| _RKT(auto_commit_interval_ms), |
| "The frequency in milliseconds that the consumer offsets " |
| "are committed (written) to offset storage. " |
| "This setting is used by the low-level legacy consumer.", |
| 10, 86400*1000, 60*1000 }, |
| { _RK_TOPIC|_RK_CONSUMER, "auto.offset.reset", _RK_C_S2I, |
| _RKT(auto_offset_reset), |
| "Action to take when there is no initial offset in offset store " |
| "or the desired offset is out of range: " |
| "'smallest','earliest' - automatically reset the offset to the smallest offset, " |
| "'largest','latest' - automatically reset the offset to the largest offset, " |
| "'error' - trigger an error which is retrieved by consuming messages " |
| "and checking 'message->err'.", |
| .vdef = RD_KAFKA_OFFSET_END, |
| .s2i = { |
| { RD_KAFKA_OFFSET_BEGINNING, "smallest" }, |
| { RD_KAFKA_OFFSET_BEGINNING, "earliest" }, |
| { RD_KAFKA_OFFSET_BEGINNING, "beginning" }, |
| { RD_KAFKA_OFFSET_END, "largest" }, |
| { RD_KAFKA_OFFSET_END, "latest" }, |
| { RD_KAFKA_OFFSET_END, "end" }, |
| { RD_KAFKA_OFFSET_INVALID, "error" }, |
| } |
| }, |
| { _RK_TOPIC|_RK_CONSUMER, "offset.store.path", _RK_C_STR, |
| _RKT(offset_store_path), |
| "Path to local file for storing offsets. If the path is a directory " |
| "a filename will be automatically generated in that directory based " |
| "on the topic and partition.", |
| .sdef = "." }, |
| |
| { _RK_TOPIC|_RK_CONSUMER, "offset.store.sync.interval.ms", _RK_C_INT, |
| _RKT(offset_store_sync_interval_ms), |
| "fsync() interval for the offset file, in milliseconds. " |
| "Use -1 to disable syncing, and 0 for immediate sync after " |
| "each write.", |
| -1, 86400*1000, -1 }, |
| |
| { _RK_TOPIC|_RK_CONSUMER, "offset.store.method", _RK_C_S2I, |
| _RKT(offset_store_method), |
| "Offset commit store method: " |
| "'file' - local file store (offset.store.path, et.al), " |
| "'broker' - broker commit store " |
| "(requires \"group.id\" to be configured and " |
| "Apache Kafka 0.8.2 or later on the broker.).", |
| .vdef = RD_KAFKA_OFFSET_METHOD_BROKER, |
| .s2i = { |
| { RD_KAFKA_OFFSET_METHOD_FILE, "file" }, |
| { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" } |
| } |
| }, |
| |
| { _RK_TOPIC|_RK_CONSUMER, "consume.callback.max.messages", _RK_C_INT, |
| _RKT(consume_callback_max_msgs), |
| "Maximum number of messages to dispatch in " |
| "one `rd_kafka_consume_callback*()` call (0 = unlimited)", |
| 0, 1000000, 0 }, |
| |
| { 0, /* End */ } |
| }; |
| |
| |
| static rd_kafka_conf_res_t |
| rd_kafka_anyconf_set_prop0 (int scope, void *conf, |
| const struct rd_kafka_property *prop, |
| const char *istr, int ival, rd_kafka_conf_set_mode_t set_mode, |
| char *errstr, size_t errstr_size) { |
| rd_kafka_conf_res_t res; |
| |
| #define _RK_PTR(TYPE,BASE,OFFSET) (TYPE)(void *)(((char *)(BASE))+(OFFSET)) |
| |
| /* Try interceptors first (only for GLOBAL config) */ |
| if (scope & _RK_GLOBAL) { |
| if (prop->type == _RK_C_PTR || prop->type == _RK_C_INTERNAL) |
| res = RD_KAFKA_CONF_UNKNOWN; |
| else |
| res = rd_kafka_interceptors_on_conf_set(conf, |
| prop->name, |
| istr, |
| errstr, |
| errstr_size); |
| if (res != RD_KAFKA_CONF_UNKNOWN) |
| return res; |
| } |
| |
| |
| if (prop->set) { |
| /* Custom setter */ |
| rd_kafka_conf_res_t res; |
| |
| res = prop->set(scope, conf, prop->name, istr, |
| _RK_PTR(void *, conf, prop->offset), |
| set_mode, errstr, errstr_size); |
| |
| if (res != RD_KAFKA_CONF_OK) |
| return res; |
| |
| /* FALLTHRU so that property value is set. */ |
| } |
| |
| switch (prop->type) |
| { |
| case _RK_C_STR: |
| { |
| char **str = _RK_PTR(char **, conf, prop->offset); |
| if (*str) |
| rd_free(*str); |
| if (istr) |
| *str = rd_strdup(istr); |
| else |
| *str = prop->sdef ? rd_strdup(prop->sdef) : NULL; |
| return RD_KAFKA_CONF_OK; |
| } |
| case _RK_C_KSTR: |
| { |
| rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf, |
| prop->offset); |
| if (*kstr) |
| rd_kafkap_str_destroy(*kstr); |
| if (istr) |
| *kstr = rd_kafkap_str_new(istr, -1); |
| else |
| *kstr = prop->sdef ? |
| rd_kafkap_str_new(prop->sdef, -1) : NULL; |
| return RD_KAFKA_CONF_OK; |
| } |
| case _RK_C_PTR: |
| *_RK_PTR(const void **, conf, prop->offset) = istr; |
| return RD_KAFKA_CONF_OK; |
| case _RK_C_BOOL: |
| case _RK_C_INT: |
| case _RK_C_S2I: |
| case _RK_C_S2F: |
| { |
| int *val = _RK_PTR(int *, conf, prop->offset); |
| |
| if (prop->type == _RK_C_S2F) { |
| switch (set_mode) |
| { |
| case _RK_CONF_PROP_SET_REPLACE: |
| *val = ival; |
| break; |
| case _RK_CONF_PROP_SET_ADD: |
| *val |= ival; |
| break; |
| case _RK_CONF_PROP_SET_DEL: |
| *val &= ~ival; |
| break; |
| } |
| } else { |
| /* Single assignment */ |
| *val = ival; |
| |
| } |
| return RD_KAFKA_CONF_OK; |
| } |
| case _RK_C_PATLIST: |
| { |
| /* Split comma-separated list into individual regex expressions |
| * that are verified and then append to the provided list. */ |
| rd_kafka_pattern_list_t **plist; |
| |
| plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset); |
| |
| if (*plist) |
| rd_kafka_pattern_list_destroy(*plist); |
| |
| if (istr) { |
| if (!(*plist = |
| rd_kafka_pattern_list_new(istr, |
| errstr, |
| (int)errstr_size))) |
| return RD_KAFKA_CONF_INVALID; |
| } else |
| *plist = NULL; |
| |
| return RD_KAFKA_CONF_OK; |
| } |
| |
| case _RK_C_INTERNAL: |
| /* Probably handled by setter */ |
| return RD_KAFKA_CONF_OK; |
| |
| default: |
| rd_kafka_assert(NULL, !*"unknown conf type"); |
| } |
| |
| /* unreachable */ |
| return RD_KAFKA_CONF_INVALID; |
| } |
| |
| |
| /** |
| * @brief Find s2i (string-to-int mapping) entry and return its array index, |
| * or -1 on miss. |
| */ |
| static int rd_kafka_conf_s2i_find (const struct rd_kafka_property *prop, |
| const char *value) { |
| int j; |
| |
| for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) { |
| if (prop->s2i[j].str && |
| !rd_strcasecmp(prop->s2i[j].str, value)) |
| return j; |
| } |
| |
| return -1; |
| } |
| |
| |
| static rd_kafka_conf_res_t |
| rd_kafka_anyconf_set_prop (int scope, void *conf, |
| const struct rd_kafka_property *prop, |
| const char *value, |
| char *errstr, size_t errstr_size) { |
| int ival; |
| |
| switch (prop->type) |
| { |
| case _RK_C_STR: |
| case _RK_C_KSTR: |
| if (prop->s2i[0].str) { |
| int match; |
| |
| if (!value || |
| (match = rd_kafka_conf_s2i_find(prop, value)) == -1){ |
| rd_snprintf(errstr, errstr_size, |
| "Invalid value for " |
| "configuration property \"%s\": " |
| "%s", |
| prop->name, value); |
| return RD_KAFKA_CONF_INVALID; |
| } |
| |
| /* Replace value string with canonical form */ |
| value = prop->s2i[match].str; |
| } |
| /* FALLTHRU */ |
| case _RK_C_PATLIST: |
| if (prop->validate && |
| (!value || !prop->validate(prop, value, -1))) { |
| rd_snprintf(errstr, errstr_size, |
| "Invalid value for " |
| "configuration property \"%s\": %s", |
| prop->name, value); |
| return RD_KAFKA_CONF_INVALID; |
| } |
| |
| return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0, |
| _RK_CONF_PROP_SET_REPLACE, |
| errstr, errstr_size); |
| |
| case _RK_C_PTR: |
| rd_snprintf(errstr, errstr_size, |
| "Property \"%s\" must be set through dedicated " |
| ".._set_..() function", prop->name); |
| return RD_KAFKA_CONF_INVALID; |
| |
| case _RK_C_BOOL: |
| if (!value) { |
| rd_snprintf(errstr, errstr_size, |
| "Bool configuration property \"%s\" cannot " |
| "be set to empty value", prop->name); |
| return RD_KAFKA_CONF_INVALID; |
| } |
| |
| |
| if (!rd_strcasecmp(value, "true") || |
| !rd_strcasecmp(value, "t") || |
| !strcmp(value, "1")) |
| ival = 1; |
| else if (!rd_strcasecmp(value, "false") || |
| !rd_strcasecmp(value, "f") || |
| !strcmp(value, "0")) |
| ival = 0; |
| else { |
| rd_snprintf(errstr, errstr_size, |
| "Expected bool value for \"%s\": " |
| "true or false", prop->name); |
| return RD_KAFKA_CONF_INVALID; |
| } |
| |
| rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival, |
| _RK_CONF_PROP_SET_REPLACE, |
| errstr, errstr_size); |
| return RD_KAFKA_CONF_OK; |
| |
| case _RK_C_INT: |
| { |
| const char *end; |
| |
| if (!value) { |
| rd_snprintf(errstr, errstr_size, |
| "Integer configuration " |
| "property \"%s\" cannot be set " |
| "to empty value", prop->name); |
| return RD_KAFKA_CONF_INVALID; |
| } |
| |
| ival = (int)strtol(value, (char **)&end, 0); |
| if (end == value) { |
| /* Non numeric, check s2i for string mapping */ |
| int match = rd_kafka_conf_s2i_find(prop, value); |
| |
| if (match == -1) { |
| rd_snprintf(errstr, errstr_size, |
| "Invalid value for " |
| "configuration property \"%s\"", |
| prop->name); |
| return RD_KAFKA_CONF_INVALID; |
| } |
| |
| ival = prop->s2i[match].val; |
| } |
| |
| if (ival < prop->vmin || |
| ival > prop->vmax) { |
| rd_snprintf(errstr, errstr_size, |
| "Configuration property \"%s\" value " |
| "%i is outside allowed range %i..%i\n", |
| prop->name, ival, |
| prop->vmin, |
| prop->vmax); |
| return RD_KAFKA_CONF_INVALID; |
| } |
| |
| rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival, |
| _RK_CONF_PROP_SET_REPLACE, |
| errstr, errstr_size); |
| return RD_KAFKA_CONF_OK; |
| } |
| |
| case _RK_C_S2I: |
| case _RK_C_S2F: |
| { |
| int j; |
| const char *next; |
| |
| if (!value) { |
| rd_snprintf(errstr, errstr_size, |
| "Configuration " |
| "property \"%s\" cannot be set " |
| "to empty value", prop->name); |
| return RD_KAFKA_CONF_INVALID; |
| } |
| |
| next = value; |
| while (next && *next) { |
| const char *s, *t; |
| rd_kafka_conf_set_mode_t set_mode = _RK_CONF_PROP_SET_ADD; /* S2F */ |
| |
| s = next; |
| |
| if (prop->type == _RK_C_S2F && |
| (t = strchr(s, ','))) { |
| /* CSV flag field */ |
| next = t+1; |
| } else { |
| /* Single string */ |
| t = s+strlen(s); |
| next = NULL; |
| } |
| |
| |
| /* Left trim */ |
| while (s < t && isspace((int)*s)) |
| s++; |
| |
| /* Right trim */ |
| while (t > s && isspace((int)*t)) |
| t--; |
| |
| /* S2F: +/- prefix */ |
| if (prop->type == _RK_C_S2F) { |
| if (*s == '+') { |
| set_mode = _RK_CONF_PROP_SET_ADD; |
| s++; |
| } else if (*s == '-') { |
| set_mode = _RK_CONF_PROP_SET_DEL; |
| s++; |
| } |
| } |
| |
| /* Empty string? */ |
| if (s == t) |
| continue; |
| |
| /* Match string to s2i table entry */ |
| for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) { |
| int new_val; |
| |
| if (!prop->s2i[j].str) |
| continue; |
| |
| if (strlen(prop->s2i[j].str) == (size_t)(t-s) && |
| !rd_strncasecmp(prop->s2i[j].str, s, |
| (int)(t-s))) |
| new_val = prop->s2i[j].val; |
| else |
| continue; |
| |
| rd_kafka_anyconf_set_prop0(scope, conf, prop, |
| value, new_val, |
| set_mode, |
| errstr, errstr_size); |
| |
| if (prop->type == _RK_C_S2F) { |
| /* Flags: OR it in: do next */ |
| break; |
| } else { |
| /* Single assignment */ |
| return RD_KAFKA_CONF_OK; |
| } |
| } |
| |
| /* S2F: Good match: continue with next */ |
| if (j < (int)RD_ARRAYSIZE(prop->s2i)) |
| continue; |
| |
| /* No match */ |
| rd_snprintf(errstr, errstr_size, |
| "Invalid value for " |
| "configuration property \"%s\"", prop->name); |
| return RD_KAFKA_CONF_INVALID; |
| |
| } |
| return RD_KAFKA_CONF_OK; |
| } |
| |
| case _RK_C_INTERNAL: |
| rd_snprintf(errstr, errstr_size, |
| "Internal property \"%s\" not settable", |
| prop->name); |
| return RD_KAFKA_CONF_INVALID; |
| |
| case _RK_C_INVALID: |
| rd_snprintf(errstr, errstr_size, "%s", prop->desc); |
| return RD_KAFKA_CONF_INVALID; |
| |
| default: |
| rd_kafka_assert(NULL, !*"unknown conf type"); |
| } |
| |
| /* not reachable */ |
| return RD_KAFKA_CONF_INVALID; |
| } |
| |
| |
| |
| static void rd_kafka_defaultconf_set (int scope, void *conf) { |
| const struct rd_kafka_property *prop; |
| |
| for (prop = rd_kafka_properties ; prop->name ; prop++) { |
| if (!(prop->scope & scope)) |
| continue; |
| |
| if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID) |
| continue; |
| |
| if (prop->ctor) |
| prop->ctor(scope, conf); |
| |
| if (prop->sdef || prop->vdef || prop->pdef) |
| rd_kafka_anyconf_set_prop0(scope, conf, prop, |
| prop->sdef ? |
| prop->sdef : prop->pdef, |
| prop->vdef, |
| _RK_CONF_PROP_SET_REPLACE, |
| NULL, 0); |
| } |
| } |
| |
| rd_kafka_conf_t *rd_kafka_conf_new (void) { |
| rd_kafka_conf_t *conf = rd_calloc(1, sizeof(*conf)); |
| rd_kafka_defaultconf_set(_RK_GLOBAL, conf); |
| return conf; |
| } |
| |
| rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void) { |
| rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf)); |
| rd_kafka_defaultconf_set(_RK_TOPIC, tconf); |
| return tconf; |
| } |
| |
| |
| |
| static int rd_kafka_anyconf_set (int scope, void *conf, |
| const char *name, const char *value, |
| char *errstr, size_t errstr_size) { |
| char estmp[1]; |
| const struct rd_kafka_property *prop; |
| rd_kafka_conf_res_t res; |
| |
| if (!errstr) { |
| errstr = estmp; |
| errstr_size = 0; |
| } |
| |
| if (value && !*value) |
| value = NULL; |
| |
| /* Try interceptors first (only for GLOBAL config for now) */ |
| if (scope & _RK_GLOBAL) { |
| res = rd_kafka_interceptors_on_conf_set( |
| (rd_kafka_conf_t *)conf, name, value, |
| errstr, errstr_size); |
| /* Handled (successfully or not) by interceptor. */ |
| if (res != RD_KAFKA_CONF_UNKNOWN) |
| return res; |
| } |
| |
| /* Then global config */ |
| |
| |
| for (prop = rd_kafka_properties ; prop->name ; prop++) { |
| |
| if (!(prop->scope & scope)) |
| continue; |
| |
| if (strcmp(prop->name, name)) |
| continue; |
| |
| if (prop->type == _RK_C_ALIAS) |
| return rd_kafka_anyconf_set(scope, conf, |
| prop->sdef, value, |
| errstr, errstr_size); |
| |
| return rd_kafka_anyconf_set_prop(scope, conf, prop, value, |
| errstr, errstr_size); |
| } |
| |
| rd_snprintf(errstr, errstr_size, |
| "No such configuration property: \"%s\"", name); |
| |
| return RD_KAFKA_CONF_UNKNOWN; |
| } |
| |
| |
| rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf, |
| const char *name, |
| const char *value, |
| char *errstr, size_t errstr_size) { |
| rd_kafka_conf_res_t res; |
| |
| res = rd_kafka_anyconf_set(_RK_GLOBAL, conf, name, value, |
| errstr, errstr_size); |
| if (res != RD_KAFKA_CONF_UNKNOWN) |
| return res; |
| |
| /* Fallthru: |
| * If the global property was unknown, try setting it on the |
| * default topic config. */ |
| if (!conf->topic_conf) { |
| /* Create topic config, might be over-written by application |
| * later. */ |
| conf->topic_conf = rd_kafka_topic_conf_new(); |
| } |
| |
| return rd_kafka_topic_conf_set(conf->topic_conf, name, value, |
| errstr, errstr_size); |
| } |
| |
| |
| rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf, |
| const char *name, |
| const char *value, |
| char *errstr, size_t errstr_size) { |
| if (!strncmp(name, "topic.", strlen("topic."))) |
| name += strlen("topic."); |
| |
| return rd_kafka_anyconf_set(_RK_TOPIC, conf, name, value, |
| errstr, errstr_size); |
| } |
| |
| |
| static void rd_kafka_anyconf_clear (int scope, void *conf, |
| const struct rd_kafka_property *prop) { |
| switch (prop->type) |
| { |
| case _RK_C_STR: |
| { |
| char **str = _RK_PTR(char **, conf, prop->offset); |
| |
| if (*str) { |
| if (prop->set) { |
| prop->set(scope, conf, prop->name, NULL, *str, |
| _RK_CONF_PROP_SET_DEL, NULL, 0); |
| /* FALLTHRU */ |
| } |
| rd_free(*str); |
| *str = NULL; |
| } |
| } |
| break; |
| |
| case _RK_C_KSTR: |
| { |
| rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf, |
| prop->offset); |
| if (*kstr) { |
| rd_kafkap_str_destroy(*kstr); |
| *kstr = NULL; |
| } |
| } |
| break; |
| |
| case _RK_C_PATLIST: |
| { |
| rd_kafka_pattern_list_t **plist; |
| plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset); |
| if (*plist) { |
| rd_kafka_pattern_list_destroy(*plist); |
| *plist = NULL; |
| } |
| } |
| break; |
| |
| case _RK_C_PTR: |
| if (_RK_PTR(void *, conf, prop->offset) != NULL) { |
| if (!strcmp(prop->name, "default_topic_conf")) { |
| rd_kafka_topic_conf_t **tconf; |
| |
| tconf = _RK_PTR(rd_kafka_topic_conf_t **, |
| conf, prop->offset); |
| if (*tconf) { |
| rd_kafka_topic_conf_destroy(*tconf); |
| *tconf = NULL; |
| } |
| } |
| } |
| break; |
| |
| default: |
| break; |
| } |
| |
| if (prop->dtor) |
| prop->dtor(scope, conf); |
| |
| } |
| |
| void rd_kafka_anyconf_destroy (int scope, void *conf) { |
| const struct rd_kafka_property *prop; |
| |
| /* Call on_conf_destroy() interceptors */ |
| if (scope == _RK_GLOBAL) |
| rd_kafka_interceptors_on_conf_destroy(conf); |
| |
| for (prop = rd_kafka_properties; prop->name ; prop++) { |
| if (!(prop->scope & scope)) |
| continue; |
| |
| rd_kafka_anyconf_clear(scope, conf, prop); |
| } |
| } |
| |
| |
| void rd_kafka_conf_destroy (rd_kafka_conf_t *conf) { |
| rd_kafka_anyconf_destroy(_RK_GLOBAL, conf); |
| //FIXME: partition_assignors |
| rd_free(conf); |
| } |
| |
| void rd_kafka_topic_conf_destroy (rd_kafka_topic_conf_t *topic_conf) { |
| rd_kafka_anyconf_destroy(_RK_TOPIC, topic_conf); |
| rd_free(topic_conf); |
| } |
| |
| |
| |
| static void rd_kafka_anyconf_copy (int scope, void *dst, const void *src, |
| size_t filter_cnt, const char **filter) { |
| const struct rd_kafka_property *prop; |
| |
| for (prop = rd_kafka_properties ; prop->name ; prop++) { |
| const char *val = NULL; |
| int ival = 0; |
| char *valstr; |
| size_t valsz; |
| size_t fi; |
| size_t nlen; |
| |
| if (!(prop->scope & scope)) |
| continue; |
| |
| if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID) |
| continue; |
| |
| /* Apply filter, if any. */ |
| nlen = strlen(prop->name); |
| for (fi = 0 ; fi < filter_cnt ; fi++) { |
| size_t flen = strlen(filter[fi]); |
| if (nlen >= flen && |
| !strncmp(filter[fi], prop->name, flen)) |
| break; |
| } |
| if (fi < filter_cnt) |
| continue; /* Filter matched */ |
| |
| switch (prop->type) |
| { |
| case _RK_C_STR: |
| case _RK_C_PTR: |
| val = *_RK_PTR(const char **, src, prop->offset); |
| |
| if (!strcmp(prop->name, "default_topic_conf") && val) |
| val = (void *)rd_kafka_topic_conf_dup( |
| (const rd_kafka_topic_conf_t *) |
| (void *)val); |
| break; |
| case _RK_C_KSTR: |
| { |
| rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, |
| src, prop->offset); |
| if (*kstr) |
| val = (*kstr)->str; |
| break; |
| } |
| |
| case _RK_C_BOOL: |
| case _RK_C_INT: |
| case _RK_C_S2I: |
| case _RK_C_S2F: |
| ival = *_RK_PTR(const int *, src, prop->offset); |
| |
| /* Get string representation of configuration value. */ |
| valsz = 0; |
| rd_kafka_anyconf_get0(src, prop, NULL, &valsz); |
| valstr = rd_alloca(valsz); |
| rd_kafka_anyconf_get0(src, prop, valstr, &valsz); |
| val = valstr; |
| break; |
| case _RK_C_PATLIST: |
| { |
| const rd_kafka_pattern_list_t **plist; |
| plist = _RK_PTR(const rd_kafka_pattern_list_t **, |
| src, prop->offset); |
| if (*plist) |
| val = (*plist)->rkpl_orig; |
| break; |
| } |
| case _RK_C_INTERNAL: |
| /* Handled by ->copy() below. */ |
| break; |
| default: |
| continue; |
| } |
| |
| if (prop->copy) |
| prop->copy(scope, dst, src, |
| _RK_PTR(void *, dst, prop->offset), |
| _RK_PTR(const void *, src, prop->offset), |
| filter_cnt, filter); |
| |
| rd_kafka_anyconf_set_prop0(scope, dst, prop, val, ival, |
| _RK_CONF_PROP_SET_REPLACE, NULL, 0); |
| } |
| } |
| |
| |
| rd_kafka_conf_t *rd_kafka_conf_dup (const rd_kafka_conf_t *conf) { |
| rd_kafka_conf_t *new = rd_kafka_conf_new(); |
| |
| rd_kafka_interceptors_on_conf_dup(new, conf, 0, NULL); |
| |
| rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, 0, NULL); |
| |
| return new; |
| } |
| |
| rd_kafka_conf_t *rd_kafka_conf_dup_filter (const rd_kafka_conf_t *conf, |
| size_t filter_cnt, |
| const char **filter) { |
| rd_kafka_conf_t *new = rd_kafka_conf_new(); |
| |
| rd_kafka_interceptors_on_conf_dup(new, conf, filter_cnt, filter); |
| |
| rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, filter_cnt, filter); |
| |
| return new; |
| } |
| |
| |
| rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup (const rd_kafka_topic_conf_t |
| *conf) { |
| rd_kafka_topic_conf_t *new = rd_kafka_topic_conf_new(); |
| |
| rd_kafka_anyconf_copy(_RK_TOPIC, new, conf, 0, NULL); |
| |
| return new; |
| } |
| |
| |
| void rd_kafka_conf_set_events (rd_kafka_conf_t *conf, int events) { |
| conf->enabled_events = events; |
| } |
| |
| |
| void rd_kafka_conf_set_dr_cb (rd_kafka_conf_t *conf, |
| void (*dr_cb) (rd_kafka_t *rk, |
| void *payload, size_t len, |
| rd_kafka_resp_err_t err, |
| void *opaque, void *msg_opaque)) { |
| conf->dr_cb = dr_cb; |
| } |
| |
| |
| void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf, |
| void (*dr_msg_cb) (rd_kafka_t *rk, |
| const rd_kafka_message_t * |
| rkmessage, |
| void *opaque)) { |
| conf->dr_msg_cb = dr_msg_cb; |
| } |
| |
| |
| void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf, |
| void (*consume_cb) (rd_kafka_message_t * |
| rkmessage, |
| void *opaque)) { |
| conf->consume_cb = consume_cb; |
| } |
| |
| void rd_kafka_conf_set_rebalance_cb ( |
| rd_kafka_conf_t *conf, |
| void (*rebalance_cb) (rd_kafka_t *rk, |
| rd_kafka_resp_err_t err, |
| rd_kafka_topic_partition_list_t *partitions, |
| void *opaque)) { |
| conf->rebalance_cb = rebalance_cb; |
| } |
| |
| void rd_kafka_conf_set_offset_commit_cb ( |
| rd_kafka_conf_t *conf, |
| void (*offset_commit_cb) (rd_kafka_t *rk, |
| rd_kafka_resp_err_t err, |
| rd_kafka_topic_partition_list_t *offsets, |
| void *opaque)) { |
| conf->offset_commit_cb = offset_commit_cb; |
| } |
| |
| |
| |
| void rd_kafka_conf_set_error_cb (rd_kafka_conf_t *conf, |
| void (*error_cb) (rd_kafka_t *rk, int err, |
| const char *reason, |
| void *opaque)) { |
| conf->error_cb = error_cb; |
| } |
| |
| |
| void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf, |
| void (*throttle_cb) ( |
| rd_kafka_t *rk, |
| const char *broker_name, |
| int32_t broker_id, |
| int throttle_time_ms, |
| void *opaque)) { |
| conf->throttle_cb = throttle_cb; |
| } |
| |
| |
| void rd_kafka_conf_set_log_cb (rd_kafka_conf_t *conf, |
| void (*log_cb) (const rd_kafka_t *rk, int level, |
| const char *fac, const char *buf)) { |
| conf->log_cb = log_cb; |
| } |
| |
| |
| void rd_kafka_conf_set_stats_cb (rd_kafka_conf_t *conf, |
| int (*stats_cb) (rd_kafka_t *rk, |
| char *json, |
| size_t json_len, |
| void *opaque)) { |
| conf->stats_cb = stats_cb; |
| } |
| |
| void rd_kafka_conf_set_socket_cb (rd_kafka_conf_t *conf, |
| int (*socket_cb) (int domain, int type, |
| int protocol, |
| void *opaque)) { |
| conf->socket_cb = socket_cb; |
| } |
| |
| void |
| rd_kafka_conf_set_connect_cb (rd_kafka_conf_t *conf, |
| int (*connect_cb) (int sockfd, |
| const struct sockaddr *addr, |
| int addrlen, |
| const char *id, |
| void *opaque)) { |
| conf->connect_cb = connect_cb; |
| } |
| |
| void |
| rd_kafka_conf_set_closesocket_cb (rd_kafka_conf_t *conf, |
| int (*closesocket_cb) (int sockfd, |
| void *opaque)) { |
| conf->closesocket_cb = closesocket_cb; |
| } |
| |
| |
| |
| #ifndef _MSC_VER |
| void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf, |
| int (*open_cb) (const char *pathname, |
| int flags, mode_t mode, |
| void *opaque)) { |
| conf->open_cb = open_cb; |
| } |
| #endif |
| |
| void rd_kafka_conf_set_opaque (rd_kafka_conf_t *conf, void *opaque) { |
| conf->opaque = opaque; |
| } |
| |
| |
| void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf, |
| rd_kafka_topic_conf_t *tconf) { |
| if (conf->topic_conf) |
| rd_kafka_topic_conf_destroy(conf->topic_conf); |
| |
| conf->topic_conf = tconf; |
| } |
| |
| |
| void |
| rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf, |
| int32_t (*partitioner) ( |
| const rd_kafka_topic_t *rkt, |
| const void *keydata, |
| size_t keylen, |
| int32_t partition_cnt, |
| void *rkt_opaque, |
| void *msg_opaque)) { |
| topic_conf->partitioner = partitioner; |
| } |
| |
| void rd_kafka_topic_conf_set_opaque (rd_kafka_topic_conf_t *topic_conf, |
| void *opaque) { |
| topic_conf->opaque = opaque; |
| } |
| |
| |
| |
| |
| /** |
| * @brief Convert flags \p ival to csv-string using S2F property \p prop. |
| * |
| * This function has two modes: size query and write. |
| * To query for needed size call with dest==NULL, |
| * to write to buffer of size dest_size call with dest!=NULL. |
| * |
| * An \p ival of -1 means all. |
| * |
| * @returns the number of bytes written to \p dest (if not NULL), else the |
| * total number of bytes needed. |
| * |
| */ |
| size_t rd_kafka_conf_flags2str (char *dest, size_t dest_size, const char *delim, |
| const struct rd_kafka_property *prop, |
| int ival) { |
| size_t of = 0; |
| int j; |
| |
| if (dest && dest_size > 0) |
| *dest = '\0'; |
| |
| /* Phase 1: scan for set flags, accumulate needed size. |
| * Phase 2: write to dest */ |
| for (j = 0 ; prop->s2i[j].str ; j++) { |
| if (prop->type == _RK_C_S2F && ival != -1 && |
| (ival & prop->s2i[j].val) != prop->s2i[j].val) |
| continue; |
| else if (prop->type == _RK_C_S2I && |
| ival != -1 && prop->s2i[j].val != ival) |
| continue; |
| |
| if (!dest) |
| of += strlen(prop->s2i[j].str) + (of > 0 ? 1 : 0); |
| else { |
| size_t r; |
| r = rd_snprintf(dest+of, dest_size-of, |
| "%s%s", |
| of > 0 ? delim:"", |
| prop->s2i[j].str); |
| if (r > dest_size-of) { |
| r = dest_size-of; |
| break; |
| } |
| of += r; |
| } |
| } |
| |
| return of+1/*nul*/; |
| } |
| |
| |
| /** |
| * Return "original"(re-created) configuration value string |
| */ |
| static rd_kafka_conf_res_t |
| rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop, |
| char *dest, size_t *dest_size) { |
| char tmp[22]; |
| const char *val = NULL; |
| size_t val_len = 0; |
| int j; |
| |
| switch (prop->type) |
| { |
| case _RK_C_STR: |
| val = *_RK_PTR(const char **, conf, prop->offset); |
| break; |
| |
| case _RK_C_KSTR: |
| { |
| const rd_kafkap_str_t **kstr = _RK_PTR(const rd_kafkap_str_t **, |
| conf, prop->offset); |
| if (*kstr) |
| val = (*kstr)->str; |
| break; |
| } |
| |
| case _RK_C_PTR: |
| val = *_RK_PTR(const void **, conf, prop->offset); |
| if (val) { |
| rd_snprintf(tmp, sizeof(tmp), "%p", (void *)val); |
| val = tmp; |
| } |
| break; |
| |
| case _RK_C_BOOL: |
| val = (*_RK_PTR(int *, conf, prop->offset) ? "true" : "false"); |
| break; |
| |
| case _RK_C_INT: |
| rd_snprintf(tmp, sizeof(tmp), "%i", |
| *_RK_PTR(int *, conf, prop->offset)); |
| val = tmp; |
| break; |
| |
| case _RK_C_S2I: |
| for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) { |
| if (prop->s2i[j].val == |
| *_RK_PTR(int *, conf, prop->offset)) { |
| val = prop->s2i[j].str; |
| break; |
| } |
| } |
| break; |
| |
| case _RK_C_S2F: |
| { |
| const int ival = *_RK_PTR(const int *, conf, prop->offset); |
| |
| val_len = rd_kafka_conf_flags2str(dest, *dest_size, ",", |
| prop, ival); |
| if (dest) { |
| val_len = 0; |
| val = dest; |
| dest = NULL; |
| } |
| break; |
| } |
| |
| case _RK_C_PATLIST: |
| { |
| const rd_kafka_pattern_list_t **plist; |
| plist = _RK_PTR(const rd_kafka_pattern_list_t **, |
| conf, prop->offset); |
| if (*plist) |
| val = (*plist)->rkpl_orig; |
| break; |
| } |
| |
| default: |
| break; |
| } |
| |
| if (val_len) { |
| *dest_size = val_len+1; |
| return RD_KAFKA_CONF_OK; |
| } |
| |
| if (!val) |
| return RD_KAFKA_CONF_INVALID; |
| |
| val_len = strlen(val); |
| |
| if (dest) { |
| size_t use_len = RD_MIN(val_len, (*dest_size)-1); |
| memcpy(dest, val, use_len); |
| dest[use_len] = '\0'; |
| } |
| |
| /* Return needed size */ |
| *dest_size = val_len+1; |
| |
| return RD_KAFKA_CONF_OK; |
| } |
| |
| |
| static rd_kafka_conf_res_t rd_kafka_anyconf_get (int scope, const void *conf, |
| const char *name, |
| char *dest, size_t *dest_size){ |
| const struct rd_kafka_property *prop; |
| |
| for (prop = rd_kafka_properties; prop->name ; prop++) { |
| |
| if (!(prop->scope & scope) || strcmp(prop->name, name)) |
| continue; |
| |
| if (prop->type == _RK_C_ALIAS) |
| return rd_kafka_anyconf_get(scope, conf, |
| prop->sdef, |
| dest, dest_size); |
| |
| if (rd_kafka_anyconf_get0(conf, prop, dest, dest_size) == |
| RD_KAFKA_CONF_OK) |
| return RD_KAFKA_CONF_OK; |
| } |
| |
| return RD_KAFKA_CONF_UNKNOWN; |
| } |
| |
| rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf, |
| const char *name, |
| char *dest, size_t *dest_size) { |
| return rd_kafka_anyconf_get(_RK_TOPIC, conf, name, dest, dest_size); |
| } |
| |
| rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf, |
| const char *name, |
| char *dest, size_t *dest_size) { |
| rd_kafka_conf_res_t res; |
| res = rd_kafka_anyconf_get(_RK_GLOBAL, conf, name, dest, dest_size); |
| if (res != RD_KAFKA_CONF_UNKNOWN || !conf->topic_conf) |
| return res; |
| |
| /* Fallthru: |
| * If the global property was unknown, try getting it from the |
| * default topic config, if any. */ |
| return rd_kafka_topic_conf_get(conf->topic_conf, name, dest, dest_size); |
| } |
| |
| |
| static const char **rd_kafka_anyconf_dump (int scope, const void *conf, |
| size_t *cntp) { |
| const struct rd_kafka_property *prop; |
| char **arr; |
| int cnt = 0; |
| |
| arr = rd_calloc(sizeof(char *), RD_ARRAYSIZE(rd_kafka_properties)*2); |
| |
| for (prop = rd_kafka_properties; prop->name ; prop++) { |
| char *val = NULL; |
| size_t val_size; |
| |
| if (!(prop->scope & scope)) |
| continue; |
| |
| /* Skip aliases, show original property instead. |
| * Skip invalids. */ |
| if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID) |
| continue; |
| |
| /* Query value size */ |
| if (rd_kafka_anyconf_get0(conf, prop, NULL, &val_size) != |
| RD_KAFKA_CONF_OK) |
| continue; |
| |
| /* Get value */ |
| val = malloc(val_size); |
| rd_kafka_anyconf_get0(conf, prop, val, &val_size); |
| |
| arr[cnt++] = rd_strdup(prop->name); |
| arr[cnt++] = val; |
| } |
| |
| *cntp = cnt; |
| |
| return (const char **)arr; |
| } |
| |
| |
| const char **rd_kafka_conf_dump (rd_kafka_conf_t *conf, size_t *cntp) { |
| return rd_kafka_anyconf_dump(_RK_GLOBAL, conf, cntp); |
| } |
| |
| const char **rd_kafka_topic_conf_dump (rd_kafka_topic_conf_t *conf, |
| size_t *cntp) { |
| return rd_kafka_anyconf_dump(_RK_TOPIC, conf, cntp); |
| } |
| |
| void rd_kafka_conf_dump_free (const char **arr, size_t cnt) { |
| char **_arr = (char **)arr; |
| unsigned int i; |
| |
| for (i = 0 ; i < cnt ; i++) |
| if (_arr[i]) |
| rd_free(_arr[i]); |
| |
| rd_free(_arr); |
| } |
| |
| void rd_kafka_conf_properties_show (FILE *fp) { |
| const struct rd_kafka_property *prop; |
| int last = 0; |
| int j; |
| char tmp[512]; |
| const char *dash80 = "----------------------------------------" |
| "----------------------------------------"; |
| |
| for (prop = rd_kafka_properties; prop->name ; prop++) { |
| const char *typeinfo = ""; |
| |
| /* Skip invalid properties. */ |
| if (prop->type == _RK_C_INVALID) |
| continue; |
| |
| if (!(prop->scope & last)) { |
| fprintf(fp, |
| "%s## %s configuration properties\n\n", |
| last ? "\n\n":"", |
| prop->scope == _RK_GLOBAL ? "Global": "Topic"); |
| |
| fprintf(fp, |
| "%-40s | %3s | %-15s | %13s | %-25s\n" |
| "%.*s-|-%.*s-|-%.*s-|-%.*s:|-%.*s\n", |
| "Property", "C/P", "Range", |
| "Default", "Description", |
| 40, dash80, 3, dash80, 15, dash80, |
| 13, dash80, 25, dash80); |
| |
| last = prop->scope & (_RK_GLOBAL|_RK_TOPIC); |
| |
| } |
| |
| fprintf(fp, "%-40s | %3s | ", prop->name, |
| (!(prop->scope & _RK_PRODUCER) == |
| !(prop->scope & _RK_CONSUMER) ? " * " : |
| ((prop->scope & _RK_PRODUCER) ? " P " : |
| (prop->scope & _RK_CONSUMER) ? " C " : ""))); |
| |
| switch (prop->type) |
| { |
| case _RK_C_STR: |
| case _RK_C_KSTR: |
| typeinfo = "string"; |
| case _RK_C_PATLIST: |
| if (prop->type == _RK_C_PATLIST) |
| typeinfo = "pattern list"; |
| if (prop->s2i[0].str) { |
| rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ", |
| prop, -1); |
| fprintf(fp, "%-15s | %13s", |
| tmp, prop->sdef ? prop->sdef : ""); |
| } else { |
| fprintf(fp, "%-15s | %13s", |
| "", prop->sdef ? prop->sdef : ""); |
| } |
| break; |
| case _RK_C_BOOL: |
| typeinfo = "boolean"; |
| fprintf(fp, "%-15s | %13s", "true, false", |
| prop->vdef ? "true" : "false"); |
| break; |
| case _RK_C_INT: |
| typeinfo = "integer"; |
| rd_snprintf(tmp, sizeof(tmp), |
| "%d .. %d", prop->vmin, prop->vmax); |
| fprintf(fp, "%-15s | %13i", tmp, prop->vdef); |
| break; |
| case _RK_C_S2I: |
| typeinfo = "enum value"; |
| rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ", |
| prop, -1); |
| fprintf(fp, "%-15s | ", tmp); |
| |
| for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) { |
| if (prop->s2i[j].val == prop->vdef) { |
| fprintf(fp, "%13s", prop->s2i[j].str); |
| break; |
| } |
| } |
| if (j == RD_ARRAYSIZE(prop->s2i)) |
| fprintf(fp, "%13s", " "); |
| break; |
| |
| case _RK_C_S2F: |
| typeinfo = "CSV flags"; |
| /* Dont duplicate builtin.features value in |
| * both Range and Default */ |
| if (!strcmp(prop->name, "builtin.features")) |
| *tmp = '\0'; |
| else |
| rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ", |
| prop, -1); |
| fprintf(fp, "%-15s | ", tmp); |
| rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ", |
| prop, prop->vdef); |
| fprintf(fp, "%13s", tmp); |
| |
| break; |
| case _RK_C_PTR: |
| typeinfo = "pointer"; |
| /* FALLTHRU */ |
| default: |
| fprintf(fp, "%-15s | %-13s", "", " "); |
| break; |
| } |
| |
| if (prop->type == _RK_C_ALIAS) |
| fprintf(fp, " | Alias for `%s`\n", prop->sdef); |
| else |
| fprintf(fp, " | %s <br>*Type: %s*\n", prop->desc, |
| typeinfo); |
| } |
| fprintf(fp, "\n"); |
| fprintf(fp, "### C/P legend: C = Consumer, P = Producer, * = both\n"); |
| } |