blob: 720167ba05f5dd8b27685e487c027556ec31fd98 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "common/kerberos/kerberos_ticket_mgr.h"
#include <chrono>
#include <iomanip>
#include <sstream>
#include "common/logging.h"
#include "core/block/block.h"
#include "information_schema/schema_scanner_helper.h"
#include "service/backend_options.h"
namespace doris::kerberos {
KerberosTicketMgr::KerberosTicketMgr(const std::string& root_path) {
_root_path = root_path;
_start_cleanup_thread();
}
KerberosTicketMgr::~KerberosTicketMgr() {
_stop_cleanup_thread();
}
void KerberosTicketMgr::_start_cleanup_thread() {
_cleanup_thread = std::make_unique<std::thread>(&KerberosTicketMgr::_cleanup_loop, this);
}
void KerberosTicketMgr::_stop_cleanup_thread() {
if (_cleanup_thread) {
_should_stop_cleanup_thread = true;
_cleanup_thread->join();
_cleanup_thread.reset();
}
}
void KerberosTicketMgr::_cleanup_loop() {
#ifdef BE_TEST
static constexpr int64_t CHECK_INTERVAL_SECONDS = 1; // For testing purpose
#else
static constexpr int64_t CHECK_INTERVAL_SECONDS = 5; // Check stop flag every 5 seconds
#endif
uint64_t last_cleanup_time = std::time(nullptr);
while (!_should_stop_cleanup_thread) {
uint64_t current_time = std::time(nullptr);
// Only perform cleanup if enough time has passed
if (current_time - last_cleanup_time >= _cleanup_interval.count()) {
std::vector<std::string> keys_to_remove;
{
std::lock_guard<std::mutex> lock(_mutex);
for (const auto& entry : _ticket_caches) {
// Check if this is the last reference to the ticket cache
// use_count() == 1 means it is only referenced in the _ticket_caches map
if (entry.second.cache.use_count() == 1) {
LOG(INFO) << "Found unused Kerberos ticket cache for principal: "
<< entry.second.cache->get_config().get_principal()
<< ", keytab: "
<< entry.second.cache->get_config().get_keytab_path();
keys_to_remove.push_back(entry.first);
}
}
// Remove entries under lock
for (const auto& key : keys_to_remove) {
LOG(INFO) << "Removing unused Kerberos ticket cache for key: " << key;
_ticket_caches.erase(key);
}
}
last_cleanup_time = current_time;
}
// Sleep for a short interval to check stop flag more frequently
std::this_thread::sleep_for(std::chrono::seconds(CHECK_INTERVAL_SECONDS));
}
}
Status KerberosTicketMgr::get_or_set_ticket_cache(
const KerberosConfig& config, std::shared_ptr<KerberosTicketCache>* ticket_cache) {
std::string key = config.get_hash_code();
std::lock_guard<std::mutex> lock(_mutex);
// Check if already exists
auto it = _ticket_caches.find(key);
if (it != _ticket_caches.end()) {
*ticket_cache = it->second.cache;
return Status::OK();
}
// Create new ticket cache
auto new_ticket_cache = _make_new_ticket_cache(config);
RETURN_IF_ERROR(new_ticket_cache->initialize());
RETURN_IF_ERROR(new_ticket_cache->login());
RETURN_IF_ERROR(new_ticket_cache->write_ticket_cache());
new_ticket_cache->start_periodic_refresh();
// Insert into _ticket_caches
KerberosTicketEntry entry {.cache = new_ticket_cache,
.last_access_time = std::chrono::steady_clock::now()};
auto [inserted_it, success] = _ticket_caches.emplace(key, std::move(entry));
if (!success) {
return Status::InternalError("Failed to insert ticket cache into map");
}
*ticket_cache = new_ticket_cache;
LOG(INFO) << "create new kerberos ticket cache: "
<< inserted_it->second.cache->get_ticket_cache_path();
return Status::OK();
}
std::shared_ptr<KerberosTicketCache> KerberosTicketMgr::get_ticket_cache(
const std::string& principal, const std::string& keytab_path) {
std::string key = KerberosConfig::get_hash_code(principal, keytab_path);
std::lock_guard<std::mutex> lock(_mutex);
auto it = _ticket_caches.find(key);
if (it != _ticket_caches.end()) {
return it->second.cache;
}
return nullptr;
}
std::shared_ptr<KerberosTicketCache> KerberosTicketMgr::_make_new_ticket_cache(
const KerberosConfig& config) {
return std::make_shared<KerberosTicketCache>(config, _root_path);
}
std::vector<KerberosTicketInfo> KerberosTicketMgr::get_krb_ticket_cache_info() {
std::vector<KerberosTicketInfo> result;
std::lock_guard<std::mutex> lock(_mutex);
for (const auto& entry : _ticket_caches) {
auto cache_info = entry.second.cache->get_ticket_info();
result.insert(result.end(), cache_info.begin(), cache_info.end());
}
return result;
}
void KerberosTicketMgr::get_ticket_cache_info_block(Block* block, const cctz::time_zone& ctz) {
TBackend be = BackendOptions::get_local_backend();
int64_t be_id = be.id;
std::string be_ip = be.host;
std::vector<KerberosTicketInfo> infos = get_krb_ticket_cache_info();
for (auto& info : infos) {
SchemaScannerHelper::insert_int64_value(0, be_id, block);
SchemaScannerHelper::insert_string_value(1, be_ip, block);
SchemaScannerHelper::insert_string_value(2, info.principal, block);
SchemaScannerHelper::insert_string_value(3, info.keytab_path, block);
SchemaScannerHelper::insert_string_value(4, info.service_principal, block);
SchemaScannerHelper::insert_string_value(5, info.cache_path, block);
SchemaScannerHelper::insert_string_value(6, info.hash_code, block);
SchemaScannerHelper::insert_datetime_value(7, info.start_time, ctz, block);
SchemaScannerHelper::insert_datetime_value(8, info.expiry_time, ctz, block);
SchemaScannerHelper::insert_datetime_value(9, info.auth_time, ctz, block);
SchemaScannerHelper::insert_int64_value(10, info.use_count, block);
SchemaScannerHelper::insert_int64_value(11, info.refresh_interval_second, block);
}
}
} // namespace doris::kerberos