blob: b81ff87b5a507909a470f0a9d176fda4659d1710 [file] [log] [blame]
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2016 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Impelements SASL Kerberos GSSAPI authentication client
* using the native Win32 SSPI.
*/
#include "rdkafka_int.h"
#include "rdkafka_transport.h"
#include "rdkafka_transport_int.h"
#include "rdkafka_sasl.h"
#include "rdkafka_sasl_int.h"
#include <stdio.h>
#include <windows.h>
#include <ntsecapi.h>
#define SECURITY_WIN32
#pragma comment(lib, "Secur32.lib")
#include <Sspi.h>
#define RD_KAFKA_SASL_SSPI_CTX_ATTRS \
(ISC_REQ_CONFIDENTIALITY | ISC_REQ_REPLAY_DETECT | \
ISC_REQ_SEQUENCE_DETECT | ISC_REQ_CONNECTION)
/* Default maximum kerberos token size for newer versions of Windows */
#define RD_KAFKA_SSPI_MAX_TOKEN_SIZE 48000
/**
* @brief Per-connection SASL state
*/
typedef struct rd_kafka_sasl_win32_state_s {
CredHandle *cred;
CtxtHandle *ctx;
wchar_t principal[512];
} rd_kafka_sasl_win32_state_t;
/**
* @returns the string representation of a SECURITY_STATUS error code
*/
static const char *rd_kafka_sasl_sspi_err2str (SECURITY_STATUS sr) {
switch (sr)
{
case SEC_E_INSUFFICIENT_MEMORY:
return "Insufficient memory";
case SEC_E_INTERNAL_ERROR:
return "Internal error";
case SEC_E_INVALID_HANDLE:
return "Invalid handle";
case SEC_E_INVALID_TOKEN:
return "Invalid token";
case SEC_E_LOGON_DENIED:
return "Logon denied";
case SEC_E_NO_AUTHENTICATING_AUTHORITY:
return "No authority could be contacted for authentication.";
case SEC_E_NO_CREDENTIALS:
return "No credentials";
case SEC_E_TARGET_UNKNOWN:
return "Target unknown";
case SEC_E_UNSUPPORTED_FUNCTION:
return "Unsupported functionality";
case SEC_E_WRONG_CREDENTIAL_HANDLE:
return "The principal that received the authentication "
"request is not the same as the one passed "
"into the pszTargetName parameter. "
"This indicates a failure in mutual "
"authentication.";
default:
return "(no string representation)";
}
}
/**
* @brief Create new CredHandle
*/
static CredHandle *
rd_kafka_sasl_sspi_cred_new (rd_kafka_transport_t *rktrans,
char *errstr, size_t errstr_size) {
TimeStamp expiry = { 0, 0 };
SECURITY_STATUS sr;
CredHandle *cred = rd_calloc(1, sizeof(*cred));
sr = AcquireCredentialsHandle(
NULL, __TEXT("Kerberos"), SECPKG_CRED_OUTBOUND,
NULL, NULL, NULL, NULL, cred, &expiry);
if (sr != SEC_E_OK) {
rd_free(cred);
rd_snprintf(errstr, errstr_size,
"Failed to acquire CredentialsHandle: "
"error code %d", sr);
return NULL;
}
rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
"Acquired Kerberos credentials handle (expiry in %d.%ds)",
expiry.u.HighPart, expiry.u.LowPart);
return cred;
}
/**
* @brief Start or continue SSPI-based authentication processing.
*/
static int rd_kafka_sasl_sspi_continue (rd_kafka_transport_t *rktrans,
const void *inbuf, size_t insize,
char *errstr, size_t errstr_size) {
rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
SecBufferDesc outbufdesc, inbufdesc;
SecBuffer outsecbuf, insecbuf;
BYTE outbuf[RD_KAFKA_SSPI_MAX_TOKEN_SIZE];
TimeStamp lifespan = { 0, 0 };
ULONG ret_ctxattrs;
CtxtHandle *ctx;
SECURITY_STATUS sr;
if (inbuf) {
if (insize > ULONG_MAX) {
rd_snprintf(errstr, errstr_size,
"Input buffer length too large (%"PRIusz") "
"and would overflow", insize);
return -1;
}
inbufdesc.ulVersion = SECBUFFER_VERSION;
inbufdesc.cBuffers = 1;
inbufdesc.pBuffers = &insecbuf;
insecbuf.cbBuffer = (unsigned long)insize;
insecbuf.BufferType = SECBUFFER_TOKEN;
insecbuf.pvBuffer = (void *)inbuf;
}
outbufdesc.ulVersion = SECBUFFER_VERSION;
outbufdesc.cBuffers = 1;
outbufdesc.pBuffers = &outsecbuf;
outsecbuf.cbBuffer = sizeof(outbuf);
outsecbuf.BufferType = SECBUFFER_TOKEN;
outsecbuf.pvBuffer = outbuf;
if (!(ctx = state->ctx)) {
/* First time: allocate context handle
* which will be filled in by Initialize..() */
ctx = rd_calloc(1, sizeof(*ctx));
}
sr = InitializeSecurityContext(
state->cred, state->ctx, state->principal,
RD_KAFKA_SASL_SSPI_CTX_ATTRS |
(state->ctx ? 0 : ISC_REQ_MUTUAL_AUTH | ISC_REQ_IDENTIFY),
0, SECURITY_NATIVE_DREP,
inbuf ? &inbufdesc : NULL,
0, ctx, &outbufdesc, &ret_ctxattrs, &lifespan);
if (!state->ctx)
state->ctx = ctx;
switch (sr)
{
case SEC_E_OK:
rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
"Initialized security context");
rktrans->rktrans_sasl.complete = 1;
break;
case SEC_I_CONTINUE_NEEDED:
break;
case SEC_I_COMPLETE_NEEDED:
case SEC_I_COMPLETE_AND_CONTINUE:
rd_snprintf(errstr, errstr_size,
"CompleteAuthToken (Digest auth, %d) "
"not implemented", sr);
return -1;
case SEC_I_INCOMPLETE_CREDENTIALS:
rd_snprintf(errstr, errstr_size,
"Incomplete credentials: "
"invalid or untrusted certificate");
return -1;
default:
rd_snprintf(errstr, errstr_size,
"InitializeSecurityContext "
"failed: %s (0x%x)",
rd_kafka_sasl_sspi_err2str(sr), sr);
return -1;
}
if (rd_kafka_sasl_send(rktrans,
outsecbuf.pvBuffer, outsecbuf.cbBuffer,
errstr, errstr_size) == -1)
return -1;
return 0;
}
/**
* @brief Sends the token response to the broker
*/
static int rd_kafka_sasl_win32_send_response (rd_kafka_transport_t *rktrans,
char *errstr,
size_t errstr_size,
SecBuffer *server_token) {
rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
SECURITY_STATUS sr;
SecBuffer in_buffer;
SecBuffer out_buffer;
SecBuffer buffers[4];
SecBufferDesc buffer_desc;
SecPkgContext_Sizes sizes;
SecPkgCredentials_NamesA names;
int send_response;
size_t namelen;
sr = QueryContextAttributes(state->ctx, SECPKG_ATTR_SIZES, &sizes);
if (sr != SEC_E_OK) {
rd_snprintf(errstr, errstr_size,
"Send response failed: %s (0x%x)",
rd_kafka_sasl_sspi_err2str(sr), sr);
return -1;
}
RD_MEMZERO(names);
sr = QueryCredentialsAttributesA(state->cred, SECPKG_CRED_ATTR_NAMES,
&names);
if (sr != SEC_E_OK) {
rd_snprintf(errstr, errstr_size,
"Query credentials failed: %s (0x%x)",
rd_kafka_sasl_sspi_err2str(sr), sr);
return -1;
}
rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
"Sending response message for user: %s", names.sUserName);
namelen = strlen(names.sUserName) + 1;
if (namelen > ULONG_MAX) {
rd_snprintf(errstr, errstr_size,
"User name length too large (%"PRIusz") "
"and would overflow");
return -1;
}
in_buffer.pvBuffer = (char *)names.sUserName;
in_buffer.cbBuffer = (unsigned long)namelen;
buffer_desc.cBuffers = 4;
buffer_desc.pBuffers = buffers;
buffer_desc.ulVersion = SECBUFFER_VERSION;
/* security trailer */
buffers[0].cbBuffer = sizes.cbSecurityTrailer;
buffers[0].BufferType = SECBUFFER_TOKEN;
buffers[0].pvBuffer = rd_calloc(1, sizes.cbSecurityTrailer);
/* protection level and buffer size received from the server */
buffers[1].cbBuffer = server_token->cbBuffer;
buffers[1].BufferType = SECBUFFER_DATA;
buffers[1].pvBuffer = rd_calloc(1, server_token->cbBuffer);
memcpy(buffers[1].pvBuffer, server_token->pvBuffer, server_token->cbBuffer);
/* user principal */
buffers[2].cbBuffer = in_buffer.cbBuffer;
buffers[2].BufferType = SECBUFFER_DATA;
buffers[2].pvBuffer = rd_calloc(1, buffers[2].cbBuffer);
memcpy(buffers[2].pvBuffer, in_buffer.pvBuffer, in_buffer.cbBuffer);
/* padding */
buffers[3].cbBuffer = sizes.cbBlockSize;
buffers[3].BufferType = SECBUFFER_PADDING;
buffers[3].pvBuffer = rd_calloc(1, buffers[2].cbBuffer);
sr = EncryptMessage(state->ctx, KERB_WRAP_NO_ENCRYPT, &buffer_desc, 0);
if (sr != SEC_E_OK) {
rd_snprintf(errstr, errstr_size,
"Encrypt message failed: %s (0x%x)",
rd_kafka_sasl_sspi_err2str(sr), sr);
FreeContextBuffer(in_buffer.pvBuffer);
rd_free(buffers[0].pvBuffer);
rd_free(buffers[1].pvBuffer);
rd_free(buffers[2].pvBuffer);
rd_free(buffers[3].pvBuffer);
return -1;
}
out_buffer.cbBuffer = buffers[0].cbBuffer +
buffers[1].cbBuffer +
buffers[2].cbBuffer +
buffers[3].cbBuffer;
out_buffer.pvBuffer = rd_calloc(1, buffers[0].cbBuffer +
buffers[1].cbBuffer +
buffers[2].cbBuffer +
buffers[3].cbBuffer);
memcpy(out_buffer.pvBuffer, buffers[0].pvBuffer, buffers[0].cbBuffer);
memcpy((unsigned char *)out_buffer.pvBuffer + (int)buffers[0].cbBuffer,
buffers[1].pvBuffer, buffers[1].cbBuffer);
memcpy((unsigned char *)out_buffer.pvBuffer +
buffers[0].cbBuffer + buffers[1].cbBuffer,
buffers[2].pvBuffer, buffers[2].cbBuffer);
memcpy((unsigned char *)out_buffer.pvBuffer +
buffers[0].cbBuffer + buffers[1].cbBuffer + buffers[2].cbBuffer,
buffers[3].pvBuffer, buffers[3].cbBuffer);
send_response = rd_kafka_sasl_send(rktrans,
out_buffer.pvBuffer,
out_buffer.cbBuffer,
errstr, errstr_size);
FreeContextBuffer(in_buffer.pvBuffer);
rd_free(out_buffer.pvBuffer);
rd_free(buffers[0].pvBuffer);
rd_free(buffers[1].pvBuffer);
rd_free(buffers[2].pvBuffer);
rd_free(buffers[3].pvBuffer);
return send_response;
}
/**
* @brief Unwrap and validate token response from broker.
*/
static int rd_kafka_sasl_win32_validate_token (rd_kafka_transport_t *rktrans,
const void *inbuf,
size_t insize,
char *errstr,
size_t errstr_size) {
rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
SecBuffer buffers[2];
SecBufferDesc buffer_desc;
SECURITY_STATUS sr;
char supported;
if (insize > ULONG_MAX) {
rd_snprintf(errstr, errstr_size,
"Input buffer length too large (%"PRIusz") "
"and would overflow");
return -1;
}
buffer_desc.cBuffers = 2;
buffer_desc.pBuffers = buffers;
buffer_desc.ulVersion = SECBUFFER_VERSION;
buffers[0].cbBuffer = (unsigned long)insize;
buffers[0].BufferType = SECBUFFER_STREAM;
buffers[0].pvBuffer = (void *)inbuf;
buffers[1].cbBuffer = 0;
buffers[1].BufferType = SECBUFFER_DATA;
buffers[1].pvBuffer = NULL;
sr = DecryptMessage(state->ctx, &buffer_desc, 0, NULL);
if (sr != SEC_E_OK) {
rd_snprintf(errstr, errstr_size,
"Decrypt message failed: %s (0x%x)",
rd_kafka_sasl_sspi_err2str(sr), sr);
return -1;
}
if (buffers[1].cbBuffer < 4) {
rd_snprintf(errstr, errstr_size,
"Validate token: "
"invalid message");
return -1;
}
supported = ((char *)buffers[1].pvBuffer)[0];
if (!(supported & 1)) {
rd_snprintf(errstr, errstr_size,
"Validate token: "
"server does not support layer");
return -1;
}
rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
"Validated server token");
return rd_kafka_sasl_win32_send_response(rktrans, errstr,
errstr_size, &buffers[1]);
}
/**
* @brief Handle SASL frame received from broker.
*/
static int rd_kafka_sasl_win32_recv (struct rd_kafka_transport_s *rktrans,
const void *buf, size_t size,
char *errstr, size_t errstr_size) {
rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
if (rktrans->rktrans_sasl.complete) {
if (rd_kafka_sasl_win32_validate_token(
rktrans, buf, size, errstr, errstr_size) == -1) {
rktrans->rktrans_sasl.complete = 0;
return -1;
}
/* Final ack from broker. */
rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
"Authenticated");
rd_kafka_sasl_auth_done(rktrans);
return 0;
}
return rd_kafka_sasl_sspi_continue(rktrans, buf, size,
errstr, errstr_size);
}
/**
* @brief Decommission SSPI state
*/
static void rd_kafka_sasl_win32_close (rd_kafka_transport_t *rktrans) {
rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
if (!state)
return;
if (state->ctx) {
DeleteSecurityContext(state->ctx);
rd_free(state->ctx);
}
if (state->cred) {
FreeCredentialsHandle(state->cred);
rd_free(state->cred);
}
rd_free(state);
}
static int rd_kafka_sasl_win32_client_new (rd_kafka_transport_t *rktrans,
const char *hostname,
char *errstr, size_t errstr_size) {
rd_kafka_t *rk = rktrans->rktrans_rkb->rkb_rk;
rd_kafka_sasl_win32_state_t *state;
if (strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
rd_snprintf(errstr, errstr_size,
"SASL mechanism \"%s\" not supported on platform",
rk->rk_conf.sasl.mechanisms);
return -1;
}
state = rd_calloc(1, sizeof(*state));
rktrans->rktrans_sasl.state = state;
_snwprintf(state->principal, RD_ARRAYSIZE(state->principal),
L"%hs/%hs",
rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.principal,
hostname);
state->cred = rd_kafka_sasl_sspi_cred_new(rktrans, errstr,
errstr_size);
if (!state->cred)
return -1;
if (rd_kafka_sasl_sspi_continue(rktrans, NULL, 0,
errstr, errstr_size) == -1)
return -1;
return 0;
}
/**
* @brief Validate config
*/
static int rd_kafka_sasl_win32_conf_validate (rd_kafka_t *rk,
char *errstr,
size_t errstr_size) {
if (!rk->rk_conf.sasl.principal) {
rd_snprintf(errstr, errstr_size,
"sasl.kerberos.principal must be set");
return -1;
}
return 0;
}
const struct rd_kafka_sasl_provider rd_kafka_sasl_win32_provider = {
.name = "Win32 SSPI",
.client_new = rd_kafka_sasl_win32_client_new,
.recv = rd_kafka_sasl_win32_recv,
.close = rd_kafka_sasl_win32_close,
.conf_validate = rd_kafka_sasl_win32_conf_validate
};