| /* |
| * librdkafka - The Apache Kafka C/C++ library |
| * |
| * Copyright (c) 2017 Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include "rdkafka_int.h" |
| #include "rdkafka_plugin.h" |
| #include "rddl.h" |
| |
| |
| typedef struct rd_kafka_plugin_s { |
| char *rkplug_path; /* Library path */ |
| rd_kafka_t *rkplug_rk; /* Backpointer to the rk handle */ |
| void *rkplug_handle; /* dlopen (or similar) handle */ |
| void *rkplug_opaque; /* Plugin's opaque */ |
| |
| } rd_kafka_plugin_t; |
| |
| |
| /** |
| * @brief Plugin path comparator |
| */ |
| static int rd_kafka_plugin_cmp (const void *_a, const void *_b) { |
| const rd_kafka_plugin_t *a = _a, *b = _b; |
| |
| return strcmp(a->rkplug_path, b->rkplug_path); |
| } |
| |
| |
| /** |
| * @brief Add plugin (by library path) and calls its conf_init() constructor |
| * |
| * @returns an error code on error. |
| * @remark duplicate plugins are silently ignored. |
| * |
| * @remark Libraries are refcounted and thus not unloaded until all |
| * plugins referencing the library have been destroyed. |
| * (dlopen() and LoadLibrary() does this for us) |
| */ |
| static rd_kafka_resp_err_t |
| rd_kafka_plugin_new (rd_kafka_conf_t *conf, const char *path, |
| char *errstr, size_t errstr_size) { |
| rd_kafka_plugin_t *rkplug; |
| const rd_kafka_plugin_t skel = { .rkplug_path = (char *)path }; |
| rd_kafka_plugin_f_conf_init_t *conf_init; |
| rd_kafka_resp_err_t err; |
| void *handle; |
| void *plug_opaque = NULL; |
| |
| /* Avoid duplicates */ |
| if (rd_list_find(&conf->plugins, &skel, rd_kafka_plugin_cmp)) { |
| rd_snprintf(errstr, errstr_size, |
| "Ignoring duplicate plugin %s", path); |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", |
| "Loading plugin \"%s\"", path); |
| |
| /* Attempt to load library */ |
| if (!(handle = rd_dl_open(path, errstr, errstr_size))) { |
| rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", |
| "Failed to load plugin \"%s\": %s", |
| path, errstr); |
| return RD_KAFKA_RESP_ERR__FS; |
| } |
| |
| /* Find conf_init() function */ |
| if (!(conf_init = rd_dl_sym(handle, "conf_init", |
| errstr, errstr_size))) { |
| rd_dl_close(handle); |
| return RD_KAFKA_RESP_ERR__INVALID_ARG; |
| } |
| |
| /* Call conf_init() */ |
| rd_kafka_dbg0(conf, PLUGIN, "PLUGINIT", |
| "Calling plugin \"%s\" conf_init()", path); |
| |
| if ((err = conf_init(conf, &plug_opaque, errstr, errstr_size))) { |
| rd_dl_close(handle); |
| return err; |
| } |
| |
| rkplug = rd_calloc(1, sizeof(*rkplug)); |
| rkplug->rkplug_path = rd_strdup(path); |
| rkplug->rkplug_handle = handle; |
| rkplug->rkplug_opaque = plug_opaque; |
| |
| rd_list_add(&conf->plugins, rkplug); |
| |
| rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", |
| "Plugin \"%s\" loaded", path); |
| |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| |
| /** |
| * @brief Free the plugin, any conf_destroy() interceptors will have been |
| * called prior to this call. |
| * @remark plugin is not removed from any list (caller's responsibility) |
| * @remark this relies on the actual library loader to refcount libraries, |
| * especially in the config copy case. |
| * This is true for POSIX dlopen() and Win32 LoadLibrary(). |
| * @locality application thread |
| */ |
| static void rd_kafka_plugin_destroy (rd_kafka_plugin_t *rkplug) { |
| rd_dl_close(rkplug->rkplug_handle); |
| rd_free(rkplug->rkplug_path); |
| rd_free(rkplug); |
| } |
| |
| |
| |
| /** |
| * @brief Initialize all configured plugins. |
| * |
| * @remark Any previously loaded plugins will be unloaded. |
| * |
| * @returns the error code of the first failing plugin. |
| * @locality application thread calling rd_kafka_new(). |
| */ |
| static rd_kafka_conf_res_t |
| rd_kafka_plugins_conf_set0 (rd_kafka_conf_t *conf, const char *paths, |
| char *errstr, size_t errstr_size) { |
| char *s; |
| |
| rd_list_destroy(&conf->plugins); |
| rd_list_init(&conf->plugins, 0, (void *)&rd_kafka_plugin_destroy); |
| |
| if (!paths || !*paths) |
| return RD_KAFKA_CONF_OK; |
| |
| /* Split paths by ; */ |
| rd_strdupa(&s, paths); |
| |
| rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", |
| "Loading plugins from conf object %p: \"%s\"", |
| conf, paths); |
| |
| while (s && *s) { |
| char *path = s; |
| char *t; |
| rd_kafka_resp_err_t err; |
| |
| if ((t = strchr(s, ';'))) { |
| *t = '\0'; |
| s = t+1; |
| } else { |
| s = NULL; |
| } |
| |
| if ((err = rd_kafka_plugin_new(conf, path, |
| errstr, errstr_size))) { |
| /* Failed to load plugin */ |
| size_t elen = errstr_size > 0 ? strlen(errstr) : 0; |
| |
| /* See if there is room for appending the |
| * plugin path to the error message. */ |
| if (elen + strlen("(plugin )") + strlen(path) < |
| errstr_size) |
| rd_snprintf(errstr+elen, errstr_size-elen, |
| " (plugin %s)", path); |
| |
| rd_list_destroy(&conf->plugins); |
| return RD_KAFKA_CONF_INVALID; |
| } |
| } |
| |
| return RD_KAFKA_CONF_OK; |
| } |
| |
| |
| /** |
| * @brief Conf setter for "plugin.library.paths" |
| */ |
| rd_kafka_conf_res_t rd_kafka_plugins_conf_set ( |
| int scope, void *pconf, const char *name, const char *value, |
| void *dstptr, rd_kafka_conf_set_mode_t set_mode, |
| char *errstr, size_t errstr_size) { |
| |
| assert(scope == _RK_GLOBAL); |
| return rd_kafka_plugins_conf_set0((rd_kafka_conf_t *)pconf, |
| set_mode == _RK_CONF_PROP_SET_DEL ? |
| NULL : value, errstr, errstr_size); |
| } |