blob: b899899a50d48f0a81fa3a2d475901aee0be7623 [file] [log] [blame]
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2017 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rdkafka_int.h"
#include "rdkafka_plugin.h"
#include "rddl.h"
typedef struct rd_kafka_plugin_s {
char *rkplug_path; /* Library path */
rd_kafka_t *rkplug_rk; /* Backpointer to the rk handle */
void *rkplug_handle; /* dlopen (or similar) handle */
void *rkplug_opaque; /* Plugin's opaque */
} rd_kafka_plugin_t;
/**
* @brief Plugin path comparator
*/
static int rd_kafka_plugin_cmp (const void *_a, const void *_b) {
const rd_kafka_plugin_t *a = _a, *b = _b;
return strcmp(a->rkplug_path, b->rkplug_path);
}
/**
* @brief Add plugin (by library path) and calls its conf_init() constructor
*
* @returns an error code on error.
* @remark duplicate plugins are silently ignored.
*
* @remark Libraries are refcounted and thus not unloaded until all
* plugins referencing the library have been destroyed.
* (dlopen() and LoadLibrary() does this for us)
*/
static rd_kafka_resp_err_t
rd_kafka_plugin_new (rd_kafka_conf_t *conf, const char *path,
char *errstr, size_t errstr_size) {
rd_kafka_plugin_t *rkplug;
const rd_kafka_plugin_t skel = { .rkplug_path = (char *)path };
rd_kafka_plugin_f_conf_init_t *conf_init;
rd_kafka_resp_err_t err;
void *handle;
void *plug_opaque = NULL;
/* Avoid duplicates */
if (rd_list_find(&conf->plugins, &skel, rd_kafka_plugin_cmp)) {
rd_snprintf(errstr, errstr_size,
"Ignoring duplicate plugin %s", path);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
"Loading plugin \"%s\"", path);
/* Attempt to load library */
if (!(handle = rd_dl_open(path, errstr, errstr_size))) {
rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
"Failed to load plugin \"%s\": %s",
path, errstr);
return RD_KAFKA_RESP_ERR__FS;
}
/* Find conf_init() function */
if (!(conf_init = rd_dl_sym(handle, "conf_init",
errstr, errstr_size))) {
rd_dl_close(handle);
return RD_KAFKA_RESP_ERR__INVALID_ARG;
}
/* Call conf_init() */
rd_kafka_dbg0(conf, PLUGIN, "PLUGINIT",
"Calling plugin \"%s\" conf_init()", path);
if ((err = conf_init(conf, &plug_opaque, errstr, errstr_size))) {
rd_dl_close(handle);
return err;
}
rkplug = rd_calloc(1, sizeof(*rkplug));
rkplug->rkplug_path = rd_strdup(path);
rkplug->rkplug_handle = handle;
rkplug->rkplug_opaque = plug_opaque;
rd_list_add(&conf->plugins, rkplug);
rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
"Plugin \"%s\" loaded", path);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* @brief Free the plugin, any conf_destroy() interceptors will have been
* called prior to this call.
* @remark plugin is not removed from any list (caller's responsibility)
* @remark this relies on the actual library loader to refcount libraries,
* especially in the config copy case.
* This is true for POSIX dlopen() and Win32 LoadLibrary().
* @locality application thread
*/
static void rd_kafka_plugin_destroy (rd_kafka_plugin_t *rkplug) {
rd_dl_close(rkplug->rkplug_handle);
rd_free(rkplug->rkplug_path);
rd_free(rkplug);
}
/**
* @brief Initialize all configured plugins.
*
* @remark Any previously loaded plugins will be unloaded.
*
* @returns the error code of the first failing plugin.
* @locality application thread calling rd_kafka_new().
*/
static rd_kafka_conf_res_t
rd_kafka_plugins_conf_set0 (rd_kafka_conf_t *conf, const char *paths,
char *errstr, size_t errstr_size) {
char *s;
rd_list_destroy(&conf->plugins);
rd_list_init(&conf->plugins, 0, (void *)&rd_kafka_plugin_destroy);
if (!paths || !*paths)
return RD_KAFKA_CONF_OK;
/* Split paths by ; */
rd_strdupa(&s, paths);
rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
"Loading plugins from conf object %p: \"%s\"",
conf, paths);
while (s && *s) {
char *path = s;
char *t;
rd_kafka_resp_err_t err;
if ((t = strchr(s, ';'))) {
*t = '\0';
s = t+1;
} else {
s = NULL;
}
if ((err = rd_kafka_plugin_new(conf, path,
errstr, errstr_size))) {
/* Failed to load plugin */
size_t elen = errstr_size > 0 ? strlen(errstr) : 0;
/* See if there is room for appending the
* plugin path to the error message. */
if (elen + strlen("(plugin )") + strlen(path) <
errstr_size)
rd_snprintf(errstr+elen, errstr_size-elen,
" (plugin %s)", path);
rd_list_destroy(&conf->plugins);
return RD_KAFKA_CONF_INVALID;
}
}
return RD_KAFKA_CONF_OK;
}
/**
* @brief Conf setter for "plugin.library.paths"
*/
rd_kafka_conf_res_t rd_kafka_plugins_conf_set (
int scope, void *pconf, const char *name, const char *value,
void *dstptr, rd_kafka_conf_set_mode_t set_mode,
char *errstr, size_t errstr_size) {
assert(scope == _RK_GLOBAL);
return rd_kafka_plugins_conf_set0((rd_kafka_conf_t *)pconf,
set_mode == _RK_CONF_PROP_SET_DEL ?
NULL : value, errstr, errstr_size);
}