| /* |
| * librdkafka - The Apache Kafka C/C++ library |
| * |
| * Copyright (c) 2015 Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include "rdkafka_int.h" |
| #include "rdkafka_transport.h" |
| #include "rdkafka_transport_int.h" |
| #include "rdkafka_sasl.h" |
| #include "rdkafka_sasl_int.h" |
| #include "rdstring.h" |
| |
| #ifdef __FreeBSD__ |
| #include <sys/wait.h> /* For WIF.. */ |
| #endif |
| |
| #ifdef __APPLE__ |
| /* Apple has deprecated most of the SASL API for unknown reason, |
| * silence those warnings. */ |
| #pragma clang diagnostic ignored "-Wdeprecated-declarations" |
| #endif |
| |
| #include <sasl/sasl.h> |
| |
| static mtx_t rd_kafka_sasl_cyrus_kinit_lock; |
| |
| typedef struct rd_kafka_sasl_cyrus_state_s { |
| sasl_conn_t *conn; |
| sasl_callback_t callbacks[16]; |
| } rd_kafka_sasl_cyrus_state_t; |
| |
| |
| |
| /** |
| * Handle received frame from broker. |
| */ |
| static int rd_kafka_sasl_cyrus_recv (struct rd_kafka_transport_s *rktrans, |
| const void *buf, size_t size, |
| char *errstr, size_t errstr_size) { |
| rd_kafka_sasl_cyrus_state_t *state = rktrans->rktrans_sasl.state; |
| int r; |
| |
| if (rktrans->rktrans_sasl.complete && size == 0) |
| goto auth_successful; |
| |
| do { |
| sasl_interact_t *interact = NULL; |
| const char *out; |
| unsigned int outlen; |
| |
| r = sasl_client_step(state->conn, |
| size > 0 ? buf : NULL, size, |
| &interact, |
| &out, &outlen); |
| |
| if (r >= 0) { |
| /* Note: outlen may be 0 here for an empty response */ |
| if (rd_kafka_sasl_send(rktrans, out, outlen, |
| errstr, errstr_size) == -1) |
| return -1; |
| } |
| |
| if (r == SASL_INTERACT) |
| rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL", |
| "SASL_INTERACT: %lu %s, %s, %s, %p", |
| interact->id, |
| interact->challenge, |
| interact->prompt, |
| interact->defresult, |
| interact->result); |
| |
| } while (r == SASL_INTERACT); |
| |
| if (r == SASL_CONTINUE) |
| return 0; /* Wait for more data from broker */ |
| else if (r != SASL_OK) { |
| rd_snprintf(errstr, errstr_size, |
| "SASL handshake failed (step): %s", |
| sasl_errdetail(state->conn)); |
| return -1; |
| } |
| |
| /* Authentication successful */ |
| auth_successful: |
| if (rktrans->rktrans_rkb->rkb_rk->rk_conf.debug & |
| RD_KAFKA_DBG_SECURITY) { |
| const char *user, *mech, *authsrc; |
| |
| if (sasl_getprop(state->conn, SASL_USERNAME, |
| (const void **)&user) != SASL_OK) |
| user = "(unknown)"; |
| |
| if (sasl_getprop(state->conn, SASL_MECHNAME, |
| (const void **)&mech) != SASL_OK) |
| mech = "(unknown)"; |
| |
| if (sasl_getprop(state->conn, SASL_AUTHSOURCE, |
| (const void **)&authsrc) != SASL_OK) |
| authsrc = "(unknown)"; |
| |
| rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL", |
| "Authenticated as %s using %s (%s)", |
| user, mech, authsrc); |
| } |
| |
| rd_kafka_sasl_auth_done(rktrans); |
| |
| return 0; |
| } |
| |
| |
| |
| |
| static ssize_t render_callback (const char *key, char *buf, |
| size_t size, void *opaque) { |
| rd_kafka_broker_t *rkb = opaque; |
| |
| if (!strcmp(key, "broker.name")) { |
| char *val, *t; |
| size_t len; |
| rd_kafka_broker_lock(rkb); |
| rd_strdupa(&val, rkb->rkb_nodename); |
| rd_kafka_broker_unlock(rkb); |
| |
| /* Just the broker name, no port */ |
| if ((t = strchr(val, ':'))) |
| len = (size_t)(t-val); |
| else |
| len = strlen(val); |
| |
| if (buf) |
| memcpy(buf, val, RD_MIN(len, size)); |
| |
| return len; |
| |
| } else { |
| rd_kafka_conf_res_t res; |
| size_t destsize = size; |
| |
| /* Try config lookup. */ |
| res = rd_kafka_conf_get(&rkb->rkb_rk->rk_conf, key, |
| buf, &destsize); |
| if (res != RD_KAFKA_CONF_OK) |
| return -1; |
| |
| /* Dont include \0 in returned size */ |
| return (destsize > 0 ? destsize-1 : destsize); |
| } |
| } |
| |
| |
| /** |
| * Execute kinit to refresh ticket. |
| * |
| * Returns 0 on success, -1 on error. |
| * |
| * Locality: any |
| */ |
| static int rd_kafka_sasl_cyrus_kinit_refresh (rd_kafka_broker_t *rkb) { |
| rd_kafka_t *rk = rkb->rkb_rk; |
| int r; |
| char *cmd; |
| char errstr[128]; |
| |
| if (!rk->rk_conf.sasl.kinit_cmd || |
| !strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI")) |
| return 0; /* kinit not configured */ |
| |
| /* Build kinit refresh command line using string rendering and config */ |
| cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd, |
| errstr, sizeof(errstr), |
| render_callback, rkb); |
| if (!cmd) { |
| rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH", |
| "Failed to construct kinit command " |
| "from sasl.kerberos.kinit.cmd template: %s", |
| errstr); |
| return -1; |
| } |
| |
| /* Execute kinit */ |
| rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH", |
| "Refreshing SASL keys with command: %s", cmd); |
| |
| mtx_lock(&rd_kafka_sasl_cyrus_kinit_lock); |
| r = system(cmd); |
| mtx_unlock(&rd_kafka_sasl_cyrus_kinit_lock); |
| |
| if (r == -1) { |
| rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH", |
| "SASL key refresh failed: Failed to execute %s", |
| cmd); |
| rd_free(cmd); |
| return -1; |
| } else if (WIFSIGNALED(r)) { |
| rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH", |
| "SASL key refresh failed: %s: received signal %d", |
| cmd, WTERMSIG(r)); |
| rd_free(cmd); |
| return -1; |
| } else if (WIFEXITED(r) && WEXITSTATUS(r) != 0) { |
| rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH", |
| "SASL key refresh failed: %s: exited with code %d", |
| cmd, WEXITSTATUS(r)); |
| rd_free(cmd); |
| return -1; |
| } |
| |
| rd_free(cmd); |
| |
| rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH", "SASL key refreshed"); |
| return 0; |
| } |
| |
| |
| /** |
| * Refresh timer callback |
| * |
| * Locality: kafka main thread |
| */ |
| static void rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb (rd_kafka_timers_t *rkts, |
| void *arg) { |
| rd_kafka_broker_t *rkb = arg; |
| |
| rd_kafka_sasl_cyrus_kinit_refresh(rkb); |
| } |
| |
| |
| |
| /** |
| * |
| * libsasl callbacks |
| * |
| */ |
| static RD_UNUSED int |
| rd_kafka_sasl_cyrus_cb_getopt (void *context, const char *plugin_name, |
| const char *option, |
| const char **result, unsigned *len) { |
| rd_kafka_transport_t *rktrans = context; |
| |
| if (!strcmp(option, "client_mech_list")) |
| *result = "GSSAPI"; |
| if (!strcmp(option, "canon_user_plugin")) |
| *result = "INTERNAL"; |
| |
| if (*result && len) |
| *len = strlen(*result); |
| |
| rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", |
| "CB_GETOPT: plugin %s, option %s: returning %s", |
| plugin_name, option, *result); |
| |
| return SASL_OK; |
| } |
| |
| static int rd_kafka_sasl_cyrus_cb_log (void *context, int level, const char *message){ |
| rd_kafka_transport_t *rktrans = context; |
| |
| if (level >= LOG_DEBUG) |
| rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", |
| "%s", message); |
| else |
| rd_rkb_log(rktrans->rktrans_rkb, level, "LIBSASL", |
| "%s", message); |
| return SASL_OK; |
| } |
| |
| |
| static int rd_kafka_sasl_cyrus_cb_getsimple (void *context, int id, |
| const char **result, unsigned *len) { |
| rd_kafka_transport_t *rktrans = context; |
| |
| switch (id) |
| { |
| case SASL_CB_USER: |
| case SASL_CB_AUTHNAME: |
| *result = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.username; |
| break; |
| |
| default: |
| *result = NULL; |
| break; |
| } |
| |
| if (len) |
| *len = *result ? strlen(*result) : 0; |
| |
| rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", |
| "CB_GETSIMPLE: id 0x%x: returning %s", id, *result); |
| |
| return *result ? SASL_OK : SASL_FAIL; |
| } |
| |
| |
| static int rd_kafka_sasl_cyrus_cb_getsecret (sasl_conn_t *conn, void *context, |
| int id, sasl_secret_t **psecret) { |
| rd_kafka_transport_t *rktrans = context; |
| const char *password; |
| |
| password = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.password; |
| |
| if (!password) { |
| *psecret = NULL; |
| } else { |
| size_t passlen = strlen(password); |
| *psecret = rd_realloc(*psecret, sizeof(**psecret) + passlen); |
| (*psecret)->len = passlen; |
| memcpy((*psecret)->data, password, passlen); |
| } |
| |
| rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", |
| "CB_GETSECRET: id 0x%x: returning %s", |
| id, *psecret ? "(hidden)":"NULL"); |
| |
| return SASL_OK; |
| } |
| |
| static int rd_kafka_sasl_cyrus_cb_chalprompt (void *context, int id, |
| const char *challenge, |
| const char *prompt, |
| const char *defres, |
| const char **result, unsigned *len) { |
| rd_kafka_transport_t *rktrans = context; |
| |
| *result = "min_chalprompt"; |
| *len = strlen(*result); |
| |
| rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", |
| "CB_CHALPROMPT: id 0x%x, challenge %s, prompt %s, " |
| "default %s: returning %s", |
| id, challenge, prompt, defres, *result); |
| |
| return SASL_OK; |
| } |
| |
| static int rd_kafka_sasl_cyrus_cb_getrealm (void *context, int id, |
| const char **availrealms, |
| const char **result) { |
| rd_kafka_transport_t *rktrans = context; |
| |
| *result = *availrealms; |
| |
| rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", |
| "CB_GETREALM: id 0x%x: returning %s", id, *result); |
| |
| return SASL_OK; |
| } |
| |
| |
| static RD_UNUSED int |
| rd_kafka_sasl_cyrus_cb_canon (sasl_conn_t *conn, |
| void *context, |
| const char *in, unsigned inlen, |
| unsigned flags, |
| const char *user_realm, |
| char *out, unsigned out_max, |
| unsigned *out_len) { |
| rd_kafka_transport_t *rktrans = context; |
| |
| if (strstr(rktrans->rktrans_rkb->rkb_rk->rk_conf. |
| sasl.mechanisms, "GSSAPI")) { |
| *out_len = rd_snprintf(out, out_max, "%s", |
| rktrans->rktrans_rkb->rkb_rk-> |
| rk_conf.sasl.principal); |
| } else if (!strcmp(rktrans->rktrans_rkb->rkb_rk->rk_conf. |
| sasl.mechanisms, "PLAIN")) { |
| *out_len = rd_snprintf(out, out_max, "%.*s", inlen, in); |
| } else |
| out = NULL; |
| |
| rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", |
| "CB_CANON: flags 0x%x, \"%.*s\" @ \"%s\": returning \"%.*s\"", |
| flags, (int)inlen, in, user_realm, (int)(*out_len), out); |
| |
| return out ? SASL_OK : SASL_FAIL; |
| } |
| |
| |
| static void rd_kafka_sasl_cyrus_close (struct rd_kafka_transport_s *rktrans) { |
| rd_kafka_sasl_cyrus_state_t *state = rktrans->rktrans_sasl.state; |
| |
| if (!state) |
| return; |
| |
| if (state->conn) |
| sasl_dispose(&state->conn); |
| rd_free(state); |
| } |
| |
| |
| /** |
| * Initialize and start SASL authentication. |
| * |
| * Returns 0 on successful init and -1 on error. |
| * |
| * Locality: broker thread |
| */ |
| static int rd_kafka_sasl_cyrus_client_new (rd_kafka_transport_t *rktrans, |
| const char *hostname, |
| char *errstr, size_t errstr_size) { |
| int r; |
| rd_kafka_sasl_cyrus_state_t *state; |
| rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; |
| rd_kafka_t *rk = rkb->rkb_rk; |
| sasl_callback_t callbacks[16] = { |
| // { SASL_CB_GETOPT, (void *)rd_kafka_sasl_cyrus_cb_getopt, rktrans }, |
| { SASL_CB_LOG, (void *)rd_kafka_sasl_cyrus_cb_log, rktrans }, |
| { SASL_CB_AUTHNAME, (void *)rd_kafka_sasl_cyrus_cb_getsimple, rktrans }, |
| { SASL_CB_PASS, (void *)rd_kafka_sasl_cyrus_cb_getsecret, rktrans }, |
| { SASL_CB_ECHOPROMPT, (void *)rd_kafka_sasl_cyrus_cb_chalprompt, rktrans }, |
| { SASL_CB_GETREALM, (void *)rd_kafka_sasl_cyrus_cb_getrealm, rktrans }, |
| { SASL_CB_CANON_USER, (void *)rd_kafka_sasl_cyrus_cb_canon, rktrans }, |
| { SASL_CB_LIST_END } |
| }; |
| |
| state = rd_calloc(1, sizeof(*state)); |
| rktrans->rktrans_sasl.state = state; |
| |
| /* SASL_CB_USER is needed for PLAIN but breaks GSSAPI */ |
| if (!strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")) { |
| int endidx; |
| /* Find end of callbacks array */ |
| for (endidx = 0 ; |
| callbacks[endidx].id != SASL_CB_LIST_END ; endidx++) |
| ; |
| |
| callbacks[endidx].id = SASL_CB_USER; |
| callbacks[endidx].proc = (void *)rd_kafka_sasl_cyrus_cb_getsimple; |
| callbacks[endidx].context = rktrans; |
| endidx++; |
| callbacks[endidx].id = SASL_CB_LIST_END; |
| } |
| |
| memcpy(state->callbacks, callbacks, sizeof(callbacks)); |
| |
| /* Acquire or refresh ticket if kinit is configured */ |
| rd_kafka_sasl_cyrus_kinit_refresh(rkb); |
| |
| r = sasl_client_new(rk->rk_conf.sasl.service_name, hostname, |
| NULL, NULL, /* no local & remote IP checks */ |
| state->callbacks, 0, &state->conn); |
| if (r != SASL_OK) { |
| rd_snprintf(errstr, errstr_size, "%s", |
| sasl_errstring(r, NULL, NULL)); |
| return -1; |
| } |
| |
| if (rk->rk_conf.debug & RD_KAFKA_DBG_SECURITY) { |
| const char *avail_mechs; |
| sasl_listmech(state->conn, NULL, NULL, " ", NULL, |
| &avail_mechs, NULL, NULL); |
| rd_rkb_dbg(rkb, SECURITY, "SASL", |
| "My supported SASL mechanisms: %s", avail_mechs); |
| } |
| |
| do { |
| const char *out; |
| unsigned int outlen; |
| const char *mech = NULL; |
| |
| r = sasl_client_start(state->conn, |
| rk->rk_conf.sasl.mechanisms, |
| NULL, &out, &outlen, &mech); |
| |
| if (r >= 0) |
| if (rd_kafka_sasl_send(rktrans, out, outlen, |
| errstr, errstr_size)) |
| return -1; |
| } while (r == SASL_INTERACT); |
| |
| if (r == SASL_OK) { |
| /* PLAIN is appearantly done here, but we still need to make sure |
| * the PLAIN frame is sent and we get a response back (but we must |
| * not pass the response to libsasl or it will fail). */ |
| rktrans->rktrans_sasl.complete = 1; |
| return 0; |
| |
| } else if (r != SASL_CONTINUE) { |
| rd_snprintf(errstr, errstr_size, |
| "SASL handshake failed (start (%d)): %s", |
| r, sasl_errdetail(state->conn)); |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| |
| |
| |
| |
| |
| |
| /** |
| * Per handle SASL term. |
| * |
| * Locality: broker thread |
| */ |
| static void rd_kafka_sasl_cyrus_broker_term (rd_kafka_broker_t *rkb) { |
| rd_kafka_t *rk = rkb->rkb_rk; |
| |
| if (!rk->rk_conf.sasl.kinit_cmd) |
| return; |
| |
| rd_kafka_timer_stop(&rk->rk_timers, &rkb->rkb_sasl_kinit_refresh_tmr,1); |
| } |
| |
| /** |
| * Broker SASL init. |
| * |
| * Locality: broker thread |
| */ |
| static void rd_kafka_sasl_cyrus_broker_init (rd_kafka_broker_t *rkb) { |
| rd_kafka_t *rk = rkb->rkb_rk; |
| |
| if (!rk->rk_conf.sasl.kinit_cmd || |
| !strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI")) |
| return; /* kinit not configured, no need to start timer */ |
| |
| rd_kafka_timer_start(&rk->rk_timers, &rkb->rkb_sasl_kinit_refresh_tmr, |
| rk->rk_conf.sasl.relogin_min_time * 1000ll, |
| rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb, rkb); |
| } |
| |
| |
| |
| static int rd_kafka_sasl_cyrus_conf_validate (rd_kafka_t *rk, |
| char *errstr, size_t errstr_size) { |
| |
| if (strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) |
| return 0; |
| |
| if (rk->rk_conf.sasl.kinit_cmd) { |
| rd_kafka_broker_t rkb; |
| char *cmd; |
| char tmperr[128]; |
| |
| memset(&rkb, 0, sizeof(rkb)); |
| strcpy(rkb.rkb_nodename, "ATestBroker:9092"); |
| rkb.rkb_rk = rk; |
| mtx_init(&rkb.rkb_lock, mtx_plain); |
| |
| cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd, |
| tmperr, sizeof(tmperr), |
| render_callback, &rkb); |
| |
| mtx_destroy(&rkb.rkb_lock); |
| |
| if (!cmd) { |
| rd_snprintf(errstr, errstr_size, |
| "Invalid sasl.kerberos.kinit.cmd value: %s", |
| tmperr); |
| return -1; |
| } |
| |
| rd_free(cmd); |
| } |
| |
| return 0; |
| } |
| |
| |
| /** |
| * Global SASL termination. |
| */ |
| void rd_kafka_sasl_cyrus_global_term (void) { |
| /* NOTE: Should not be called since the application may be using SASL too*/ |
| /* sasl_done(); */ |
| mtx_destroy(&rd_kafka_sasl_cyrus_kinit_lock); |
| } |
| |
| |
| /** |
| * Global SASL init, called once per runtime. |
| */ |
| int rd_kafka_sasl_cyrus_global_init (void) { |
| int r; |
| |
| mtx_init(&rd_kafka_sasl_cyrus_kinit_lock, mtx_plain); |
| |
| r = sasl_client_init(NULL); |
| if (r != SASL_OK) { |
| fprintf(stderr, "librdkafka: sasl_client_init() failed: %s\n", |
| sasl_errstring(r, NULL, NULL)); |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| |
| const struct rd_kafka_sasl_provider rd_kafka_sasl_cyrus_provider = { |
| .name = "Cyrus", |
| .client_new = rd_kafka_sasl_cyrus_client_new, |
| .recv = rd_kafka_sasl_cyrus_recv, |
| .close = rd_kafka_sasl_cyrus_close, |
| .broker_init = rd_kafka_sasl_cyrus_broker_init, |
| .broker_term = rd_kafka_sasl_cyrus_broker_term, |
| .conf_validate = rd_kafka_sasl_cyrus_conf_validate |
| }; |