blob: 5ea1ef41409411e0c9f594839f699bc30f3cdefe [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "udf/python/python_server.h"
#include <arrow/type_fwd.h>
#include <butil/fd_utility.h>
#include <dirent.h>
#include <fmt/core.h>
#include <sys/poll.h>
#include <sys/stat.h>
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <chrono>
#include <fstream>
#include <future>
#include <thread>
#include "arrow/flight/client.h"
#include "common/config.h"
#include "common/status.h"
#include "udf/python/python_udaf_client.h"
#include "udf/python/python_udf_client.h"
#include "udf/python/python_udtf_client.h"
#include "util/cpu_info.h"
namespace doris {
std::shared_ptr<PythonServerManager::VersionedProcessPool>
PythonServerManager::_get_or_create_process_pool(const PythonVersion& version) {
std::lock_guard<std::mutex> lock(_pools_mutex);
auto& pool = _process_pools[version];
if (!pool) {
pool = std::make_shared<VersionedProcessPool>();
}
return pool;
}
std::vector<std::pair<PythonVersion, std::shared_ptr<PythonServerManager::VersionedProcessPool>>>
PythonServerManager::_snapshot_process_pools() {
std::lock_guard<std::mutex> lock(_pools_mutex);
std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>> snapshot;
snapshot.reserve(_process_pools.size());
for (const auto& [version, pool] : _process_pools) {
snapshot.emplace_back(version, pool);
}
return snapshot;
}
#ifdef BE_TEST
void PythonServerManager::set_process_pool_for_test(const PythonVersion& version,
std::vector<ProcessPtr> processes,
bool initialized) {
auto versioned_pool = _get_or_create_process_pool(version);
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
versioned_pool->processes = std::move(processes);
versioned_pool->initialized = initialized;
}
std::vector<ProcessPtr>& PythonServerManager::process_pool_for_test(const PythonVersion& version) {
auto versioned_pool = _get_or_create_process_pool(version);
return versioned_pool->processes;
}
#endif
template <typename ClientType>
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<ClientType>* client,
const std::shared_ptr<arrow::Schema>& data_schema) {
std::shared_ptr<VersionedProcessPool> versioned_pool =
DORIS_TRY(_ensure_pool_initialized(version));
ProcessPtr process;
RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
} else {
RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
}
return Status::OK();
}
Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
auto versioned_pool = _get_or_create_process_pool(version);
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
// Check if already initialized
if (versioned_pool->initialized) return versioned_pool;
// 0 means use CPU core count as default, otherwise use the specified value
int max_pool_size = config::max_python_process_num > 0 ? config::max_python_process_num
: CpuInfo::num_cores();
LOG(INFO) << "Initializing Python process pool for version " << version.to_string() << " with "
<< max_pool_size
<< " processes (config::max_python_process_num=" << config::max_python_process_num
<< ", CPU cores=" << CpuInfo::num_cores() << ")";
std::vector<std::future<Status>> futures;
std::vector<ProcessPtr> temp_processes(max_pool_size);
for (int i = 0; i < max_pool_size; i++) {
futures.push_back(std::async(std::launch::async, [this, &version, i, &temp_processes]() {
ProcessPtr process;
Status s = fork(version, &process);
if (s.ok()) {
temp_processes[i] = std::move(process);
}
return s;
}));
}
int success_count = 0;
int failure_count = 0;
const auto init_start_time = std::chrono::steady_clock::now();
#ifdef BE_TEST
constexpr auto progress_log_interval = std::chrono::milliseconds(50);
#else
constexpr auto progress_log_interval = std::chrono::seconds(20);
#endif
for (int i = 0; i < max_pool_size; i++) {
// Print init log every 20s until the current slot is ready.
while (futures[i].wait_for(progress_log_interval) != std::future_status::ready) {
const auto now = std::chrono::steady_clock::now();
const auto total_elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(now - init_start_time)
.count();
LOG(INFO) << "Python process pool initialization progress for version "
<< version.to_string() << ": waiting_slot=" << (i + 1) << "/" << max_pool_size
<< ", success=" << success_count << ", failed=" << failure_count
<< ", elapsed_ms=" << total_elapsed_ms;
}
Status s = futures[i].get();
if (s.ok() && temp_processes[i]) {
versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
success_count++;
} else {
failure_count++;
LOG(WARNING) << "Failed to create Python process " << (i + 1) << "/" << max_pool_size
<< ": " << s.to_string();
}
}
if (versioned_pool->processes.empty()) {
return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
"Failed to initialize Python process pool: all {} process creation attempts failed",
max_pool_size));
}
const auto total_elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - init_start_time)
.count();
LOG(INFO) << "Python process pool initialized for version " << version.to_string()
<< ": created " << success_count << " processes"
<< (failure_count > 0 ? fmt::format(" ({} failed)", failure_count) : "")
<< ", elapsed_ms=" << total_elapsed_ms;
versioned_pool->initialized = true;
_start_health_check_thread();
return versioned_pool;
}
Status PythonServerManager::_get_process(
const PythonVersion& version, const std::shared_ptr<VersionedProcessPool>& versioned_pool,
ProcessPtr* process) {
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
std::vector<ProcessPtr>& pool = versioned_pool->processes;
if (UNLIKELY(pool.empty())) {
return Status::InternalError("Python process pool is empty for version {}",
version.to_string());
}
// Prefer an already-alive process and only use load balancing inside that alive subset.
// keep dead entries stay in the pool for the background health checker
// unless there is no alive process left for the current request.
auto min_alive_iter = std::min_element(pool.begin(), pool.end(),
[](const ProcessPtr& a, const ProcessPtr& b) {
const bool a_alive = a && a->is_alive();
const bool b_alive = b && b->is_alive();
if (a_alive != b_alive) {
return a_alive > b_alive;
}
return a.use_count() < b.use_count();
});
if (min_alive_iter != pool.end() && *min_alive_iter && (*min_alive_iter)->is_alive()) {
*process = *min_alive_iter;
return Status::OK();
}
// Only reach here when the pool has no alive process at all. Try one foreground
// recovery so the caller has a chance to proceed; leave batch repair to health check.
auto& candidate = pool.front();
ProcessPtr replacement;
Status status = fork(version, &replacement);
if (!status.ok()) {
return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
"Python process pool has no available process for version {}, reason: {}",
version.to_string(), status.to_string());
}
candidate = std::move(replacement);
*process = candidate;
return Status::OK();
}
Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* process) {
std::string python_executable_path = version.get_executable_path();
std::string fight_server_path = get_fight_server_path();
std::string base_unix_socket_path = get_base_unix_socket_path();
std::vector<std::string> args = {"-u", fight_server_path, base_unix_socket_path};
boost::process::environment env = boost::this_process::environment();
boost::process::ipstream child_output;
try {
boost::process::child c(
python_executable_path, args, boost::process::std_out > child_output,
boost::process::env = env,
boost::process::on_exit([](int exit_code, const std::error_code& ec) {
if (ec) {
LOG(WARNING) << "Python UDF server exited with error: " << ec.message();
}
}));
// Wait for socket file to be created (indicates server is ready)
std::string expected_socket_path = get_unix_socket_file_path(c.id());
bool started_successfully = false;
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
const auto timeout = std::chrono::milliseconds(5000);
while (std::chrono::steady_clock::now() - start < timeout) {
struct stat buffer;
if (stat(expected_socket_path.c_str(), &buffer) == 0) {
started_successfully = true;
break;
}
if (!c.running()) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
if (!started_successfully) {
if (c.running()) {
c.terminate();
c.wait();
}
return Status::InternalError("Python server start failed: socket file not found at {}",
expected_socket_path);
}
*process = std::make_shared<PythonUDFProcess>(std::move(c), std::move(child_output));
} catch (const std::exception& e) {
return Status::InternalError("Failed to start Python UDF server: {}", e.what());
}
return Status::OK();
}
void PythonServerManager::_start_health_check_thread() {
std::lock_guard<std::mutex> lock(_health_check_mutex);
if (_health_check_thread) return;
LOG(INFO) << "Starting Python process health check thread (interval: 30 seconds)";
_health_check_thread = std::make_unique<std::thread>([this]() {
// Health check loop
while (!_shutdown_flag.load(std::memory_order_acquire)) {
// Wait for interval or shutdown signal
{
std::unique_lock<std::mutex> lock(_health_check_mutex);
_health_check_cv.wait_for(lock, std::chrono::seconds(30), [this]() {
return _shutdown_flag.load(std::memory_order_acquire);
});
}
if (_shutdown_flag.load(std::memory_order_acquire)) break;
_check_and_recreate_processes();
_refresh_memory_stats();
}
LOG(INFO) << "Python process health check thread exiting";
});
}
void PythonServerManager::_check_and_recreate_processes() {
int total_checked = 0;
int total_dead = 0;
int total_recreated = 0;
for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
auto& pool = versioned_pool->processes;
for (size_t i = 0; i < pool.size(); ++i) {
auto& process = pool[i];
if (!process) continue;
total_checked++;
if (!process->is_alive()) {
total_dead++;
LOG(WARNING) << "Detected dead Python process (pid=" << process->get_child_pid()
<< ", version=" << version.to_string() << "), recreating...";
ProcessPtr new_process;
Status s = fork(version, &new_process);
if (s.ok()) {
pool[i] = std::move(new_process);
total_recreated++;
LOG(INFO) << "Successfully recreated Python process for version "
<< version.to_string();
} else {
LOG(ERROR) << "Failed to recreate Python process for version "
<< version.to_string() << ": " << s.to_string();
pool.erase(pool.begin() + i);
--i;
}
}
}
}
if (total_dead > 0) {
LOG(INFO) << "Health check completed: checked=" << total_checked << ", dead=" << total_dead
<< ", recreated=" << total_recreated;
}
}
void PythonServerManager::shutdown() {
// Signal health check thread to stop
_shutdown_flag.store(true, std::memory_order_release);
_health_check_cv.notify_one();
if (_health_check_thread && _health_check_thread->joinable()) {
_health_check_thread->join();
_health_check_thread.reset();
}
// Shutdown all processes
for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
auto& pool = versioned_pool->processes;
for (auto& process : pool) {
if (process) {
process->shutdown();
}
}
pool.clear();
versioned_pool->initialized = false;
}
{
std::lock_guard<std::mutex> lock(_pools_mutex);
_process_pools.clear();
}
}
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) {
// Read from /proc/{pid}/statm
// Format: size resident shared text lib data dt
std::string statm_path = fmt::format("/proc/{}/statm", pid);
std::ifstream statm_file(statm_path);
if (!statm_file.is_open()) {
return Status::InternalError("Cannot open {}", statm_path);
}
size_t size_pages = 0, rss_pages = 0;
// we only care about RSS, read and ignore the total size field
statm_file >> size_pages >> rss_pages;
if (statm_file.fail()) {
return Status::InternalError("Failed to read {}", statm_path);
}
// Convert pages to bytes
long page_size = sysconf(_SC_PAGESIZE);
*rss_bytes = rss_pages * page_size;
return Status::OK();
}
void PythonServerManager::_refresh_memory_stats() {
int64_t total_rss = 0;
for (const auto& [version, versioned_pool] : _snapshot_process_pools()) {
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
const auto& pool = versioned_pool->processes;
for (const auto& process : pool) {
if (!process || !process->is_alive()) continue;
size_t rss_bytes = 0;
Status s = _read_process_memory(process->get_child_pid(), &rss_bytes);
if (s.ok()) {
total_rss += rss_bytes;
} else [[unlikely]] {
LOG(WARNING) << "Failed to read memory info for Python process (pid="
<< process->get_child_pid() << "): " << s.to_string();
}
}
}
_mem_tracker.set_consumption(total_rss);
LOG(INFO) << _mem_tracker.log_usage();
if (config::python_udf_processes_memory_limit_bytes > 0 &&
total_rss > config::python_udf_processes_memory_limit_bytes) {
LOG(WARNING) << "Python UDF process memory usage exceeds limit: rss_bytes=" << total_rss
<< ", limit_bytes=" << config::python_udf_processes_memory_limit_bytes;
}
}
Status PythonServerManager::clear_module_cache(const std::string& location) {
if (location.empty()) {
return Status::InvalidArgument("Empty location for clear_module_cache");
}
std::string body = fmt::format(R"({{"location": "{}"}})", location);
return _broadcast_action_to_processes("clear_module_cache", body,
fmt::format("location={}", location));
}
void PythonServerManager::clear_udaf_state_cache(int64_t function_id) {
std::string body = fmt::format(R"({{"function_id": {}}})", function_id);
WARN_IF_ERROR(_broadcast_action_to_processes("clear_udaf_state_cache", body,
fmt::format("function_id={}", function_id)),
"failed to clear Python UDAF state cache");
}
Status PythonServerManager::_broadcast_action_to_processes(const std::string& action_type,
const std::string& body,
const std::string& log_name) {
int success_count = 0;
int fail_count = 0;
bool has_active_process = false;
for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
auto& pool = versioned_pool->processes;
for (auto& process : pool) {
if (!process || !process->is_alive()) {
continue;
}
has_active_process = true;
try {
auto loc_result = arrow::flight::Location::Parse(process->get_uri());
if (!loc_result.ok()) [[unlikely]] {
fail_count++;
continue;
}
auto client_result = arrow::flight::FlightClient::Connect(*loc_result);
if (!client_result.ok()) [[unlikely]] {
fail_count++;
continue;
}
auto client = std::move(*client_result);
arrow::flight::Action action;
action.type = action_type;
action.body = arrow::Buffer::FromString(body);
auto result_stream = client->DoAction(action);
if (!result_stream.ok()) {
fail_count++;
continue;
}
auto result = (*result_stream)->Next();
if (result.ok() && *result) {
success_count++;
} else {
fail_count++;
}
} catch (...) {
fail_count++;
}
}
}
if (!has_active_process) {
return Status::OK();
}
LOG(INFO) << action_type << " completed for " << log_name << ", success=" << success_count
<< ", failed=" << fail_count;
if (fail_count > 0) {
return Status::InternalError("{} failed for {}, success={}, failed={}", action_type,
log_name, success_count, fail_count);
}
return Status::OK();
}
// Explicit template instantiation for UDF, UDAF and UDTF clients
template Status PythonServerManager::get_client<PythonUDFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDFClient>* client,
const std::shared_ptr<arrow::Schema>& data_schema);
template Status PythonServerManager::get_client<PythonUDAFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDAFClient>* client,
const std::shared_ptr<arrow::Schema>& data_schema);
template Status PythonServerManager::get_client<PythonUDTFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDTFClient>* client,
const std::shared_ptr<arrow::Schema>& data_schema);
} // namespace doris