blob: 79ceb03dd773c2e50f560a16d724d8c11f5a8fa9 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <librdkafka/rdkafkacpp.h>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include "common/status.h"
namespace doris {
/**
* AWS MSK IAM authentication token generator.
*
* This class generates SASL/OAUTHBEARER tokens for AWS MSK IAM authentication.
* It uses AWS SDK for C++ to obtain IAM credentials and generates signed tokens
* that can be used with librdkafka's OAUTHBEARER mechanism.
*/
class AwsMskIamAuth {
public:
struct Config {
std::string region; // AWS region (e.g., "us-east-1"), required
std::string access_key; // AWS Access Key ID (optional, for explicit credentials)
std::string secret_key; // AWS Secret Access Key (optional, for explicit credentials)
std::string role_arn; // IAM role ARN (optional, for assume role)
std::string external_id; // Optional external ID for STS AssumeRole
std::string profile_name; // AWS profile name (optional, reads from ~/.aws/credentials)
std::string
credentials_provider; // Credentials provider type (optional, e.g., "ENV", "INSTANCE_PROFILE")
int token_refresh_margin_ms = 60000; // Refresh token 60s before expiry
};
explicit AwsMskIamAuth(Config config);
~AwsMskIamAuth() = default;
/**
* Generate AWS MSK IAM authentication token.
*
* The token is a base64url-encoded presigned URL following AWS SigV4 format:
* https://kafka.<region>.amazonaws.com/?Action=kafka-cluster:Connect
* &X-Amz-Algorithm=AWS4-HMAC-SHA256
* &X-Amz-Credential=<access-key>/<date>/<region>/kafka-cluster/aws4_request
* &X-Amz-Date=<timestamp>
* &X-Amz-Expires=900
* &X-Amz-SignedHeaders=host
* &X-Amz-Signature=<signature>
* &X-Amz-Security-Token=<session-token> // if using temporary credentials
* &User-Agent=doris-msk-iam-auth/1.0
*
* Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python
*
* @param broker_hostname The MSK broker hostname (used for logging, not in token)
* @param token Output: base64url-encoded signed URL token
* @param token_lifetime_ms Output: token lifetime in milliseconds (3600000ms = 1 hour)
* @return Status indicating success or failure
*/
Status generate_token(const std::string& broker_hostname, std::string* token,
int64_t* token_lifetime_ms);
/**
* Get current AWS credentials.
* This will refresh credentials if they are expired or about to expire.
*/
Status get_credentials(Aws::Auth::AWSCredentials* credentials);
private:
// Create AWS credentials provider based on configuration
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _create_credentials_provider();
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _create_provider_from_type(
const std::string& provider_type);
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _create_assume_role_base_provider();
// HMAC-SHA256 returning hex string
std::string _hmac_sha256_hex(const std::string& key, const std::string& data);
std::string _url_encode(const std::string& value);
std::string _base64url_encode(const std::string& input);
// Calculate AWS SigV4 signing key
std::string _calculate_signing_key(const std::string& secret_key, const std::string& date_stamp,
const std::string& region, const std::string& service);
std::string _hmac_sha256(const std::string& key, const std::string& data);
std::string _sha256(const std::string& data);
std::string _get_timestamp();
std::string _get_date_stamp(const std::string& timestamp);
bool _should_refresh_credentials();
Config _config;
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _credentials_provider;
std::mutex _mutex;
Aws::Auth::AWSCredentials _cached_credentials;
std::chrono::time_point<std::chrono::system_clock> _credentials_expiry;
};
/**
* librdkafka OAUTHBEARER callback for AWS MSK IAM authentication.
*
* This callback is invoked by librdkafka when it needs to refresh the
* OAUTHBEARER token. It uses AwsMskIamAuth to generate the token.
*/
class AwsMskIamOAuthCallback : public RdKafka::OAuthBearerTokenRefreshCb {
public:
/**
* Create an OAuth callback from Kafka custom properties.
*
* This factory method checks if AWS MSK IAM authentication is configured
* (security.protocol=SASL_SSL and sasl.mechanism=OAUTHBEARER) and creates
* the callback with proper configuration.
*
* @param custom_properties Kafka custom properties map
* @param brokers Kafka broker list (comma-separated)
* @return unique_ptr to callback if IAM auth is configured, nullptr otherwise
*/
static std::unique_ptr<AwsMskIamOAuthCallback> create_from_properties(
const std::unordered_map<std::string, std::string>& custom_properties,
const std::string& brokers);
explicit AwsMskIamOAuthCallback(std::shared_ptr<AwsMskIamAuth> auth,
std::string broker_hostname);
/**
* Synchronously refresh and set OAuth token.
* Can be called directly during initialization or by the callback.
*
* @param handle The Kafka handle (consumer or producer)
* @return Status indicating success or failure
*/
Status refresh_now(RdKafka::Handle* handle);
/**
* Callback invoked by librdkafka to refresh OAuth token.
*
* @param handle The Kafka handle (consumer or producer)
* @param oauthbearer_config Configuration string from 'sasl.oauthbearer.config'
*/
void oauthbearer_token_refresh_cb(RdKafka::Handle* handle,
const std::string& oauthbearer_config) override;
private:
std::shared_ptr<AwsMskIamAuth> _auth;
std::string _broker_hostname;
};
} // namespace doris