blob: 646e1e79039b5b9e71d5ccc232c23502e2517ffd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "udf/python/python_server.h"
#include <arrow/type_fwd.h>
#include <butil/fd_utility.h>
#include <dirent.h>
#include <fmt/core.h>
#include <sys/poll.h>
#include <sys/stat.h>
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <fstream>
#include "arrow/flight/client.h"
#include "common/config.h"
#include "udf/python/python_udaf_client.h"
#include "udf/python/python_udf_client.h"
#include "udf/python/python_udtf_client.h"
#include "util/cpu_info.h"
namespace doris {
template <typename T>
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<T>* client,
const std::shared_ptr<arrow::Schema>& data_schema) {
// Ensure process pool is initialized for this version
RETURN_IF_ERROR(ensure_pool_initialized(version));
ProcessPtr process;
RETURN_IF_ERROR(get_process(version, &process));
if constexpr (std::is_same_v<T, PythonUDAFClient>) {
RETURN_IF_ERROR(T::create(func_meta, std::move(process), data_schema, client));
} else {
RETURN_IF_ERROR(T::create(func_meta, std::move(process), client));
}
return Status::OK();
}
Status PythonServerManager::ensure_pool_initialized(const PythonVersion& version) {
std::lock_guard<std::mutex> lock(_pools_mutex);
// Check if already initialized
if (_initialized_versions.count(version)) return Status::OK();
std::vector<ProcessPtr>& pool = _process_pools[version];
// 0 means use CPU core count as default, otherwise use the specified value
int max_pool_size = config::max_python_process_num > 0 ? config::max_python_process_num
: CpuInfo::num_cores();
LOG(INFO) << "Initializing Python process pool for version " << version.to_string() << " with "
<< max_pool_size
<< " processes (config::max_python_process_num=" << config::max_python_process_num
<< ", CPU cores=" << CpuInfo::num_cores() << ")";
std::vector<std::future<Status>> futures;
std::vector<ProcessPtr> temp_processes(max_pool_size);
for (int i = 0; i < max_pool_size; i++) {
futures.push_back(std::async(std::launch::async, [this, &version, i, &temp_processes]() {
ProcessPtr process;
Status s = fork(version, &process);
if (s.ok()) {
temp_processes[i] = std::move(process);
}
return s;
}));
}
int success_count = 0;
int failure_count = 0;
for (int i = 0; i < max_pool_size; i++) {
Status s = futures[i].get();
if (s.ok() && temp_processes[i]) {
pool.push_back(std::move(temp_processes[i]));
success_count++;
} else {
failure_count++;
LOG(WARNING) << "Failed to create Python process " << (i + 1) << "/" << max_pool_size
<< ": " << s.to_string();
}
}
if (pool.empty()) {
return Status::InternalError(
"Failed to initialize Python process pool: all {} process creation attempts failed",
max_pool_size);
}
LOG(INFO) << "Python process pool initialized for version " << version.to_string()
<< ": created " << success_count << " processes"
<< (failure_count > 0 ? fmt::format(" ({} failed)", failure_count) : "");
_initialized_versions.insert(version);
_start_health_check_thread();
return Status::OK();
}
Status PythonServerManager::get_process(const PythonVersion& version, ProcessPtr* process) {
std::lock_guard<std::mutex> lock(_pools_mutex);
std::vector<ProcessPtr>& pool = _process_pools[version];
if (UNLIKELY(pool.empty())) {
return Status::InternalError("Python process pool is empty for version {}",
version.to_string());
}
// Find process with minimum load (use_count - 1 gives active client count)
auto min_iter = std::min_element(
pool.begin(), pool.end(),
[](const ProcessPtr& a, const ProcessPtr& b) { return a.use_count() < b.use_count(); });
// Return process with minimum load
*process = *min_iter;
return Status::OK();
}
Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* process) {
std::string python_executable_path = version.get_executable_path();
std::string fight_server_path = get_fight_server_path();
std::string base_unix_socket_path = get_base_unix_socket_path();
std::vector<std::string> args = {"-u", fight_server_path, base_unix_socket_path};
boost::process::environment env = boost::this_process::environment();
boost::process::ipstream child_output;
try {
boost::process::child c(
python_executable_path, args, boost::process::std_out > child_output,
boost::process::env = env,
boost::process::on_exit([](int exit_code, const std::error_code& ec) {
if (ec) {
LOG(WARNING) << "Python UDF server exited with error: " << ec.message();
}
}));
// Wait for socket file to be created (indicates server is ready)
std::string expected_socket_path = get_unix_socket_file_path(c.id());
bool started_successfully = false;
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
const auto timeout = std::chrono::milliseconds(5000);
while (std::chrono::steady_clock::now() - start < timeout) {
struct stat buffer;
if (stat(expected_socket_path.c_str(), &buffer) == 0) {
started_successfully = true;
break;
}
if (!c.running()) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
if (!started_successfully) {
if (c.running()) {
c.terminate();
c.wait();
}
return Status::InternalError("Python server start failed: socket file not found at {}",
expected_socket_path);
}
*process = std::make_shared<PythonUDFProcess>(std::move(c), std::move(child_output));
} catch (const std::exception& e) {
return Status::InternalError("Failed to start Python UDF server: {}", e.what());
}
return Status::OK();
}
void PythonServerManager::_start_health_check_thread() {
if (_health_check_thread) return;
LOG(INFO) << "Starting Python process health check thread (interval: 30 seconds)";
_health_check_thread = std::make_unique<std::thread>([this]() {
// Health check loop
while (!_shutdown_flag.load(std::memory_order_acquire)) {
// Wait for interval or shutdown signal
{
std::unique_lock<std::mutex> lock(_health_check_mutex);
_health_check_cv.wait_for(lock, std::chrono::seconds(30), [this]() {
return _shutdown_flag.load(std::memory_order_acquire);
});
}
if (_shutdown_flag.load(std::memory_order_acquire)) break;
_check_and_recreate_processes();
_refresh_memory_stats();
}
LOG(INFO) << "Python process health check thread exiting";
});
}
void PythonServerManager::_check_and_recreate_processes() {
std::lock_guard<std::mutex> lock(_pools_mutex);
int total_checked = 0;
int total_dead = 0;
int total_recreated = 0;
for (auto& [version, pool] : _process_pools) {
for (size_t i = 0; i < pool.size(); ++i) {
auto& process = pool[i];
if (!process) continue;
total_checked++;
if (!process->is_alive()) {
total_dead++;
LOG(WARNING) << "Detected dead Python process (pid=" << process->get_child_pid()
<< ", version=" << version.to_string() << "), recreating...";
ProcessPtr new_process;
Status s = fork(version, &new_process);
if (s.ok()) {
pool[i] = std::move(new_process);
total_recreated++;
LOG(INFO) << "Successfully recreated Python process for version "
<< version.to_string();
} else {
LOG(ERROR) << "Failed to recreate Python process for version "
<< version.to_string() << ": " << s.to_string();
pool.erase(pool.begin() + i);
--i;
}
}
}
}
if (total_dead > 0) {
LOG(INFO) << "Health check completed: checked=" << total_checked << ", dead=" << total_dead
<< ", recreated=" << total_recreated;
}
}
void PythonServerManager::shutdown() {
// Signal health check thread to stop
_shutdown_flag.store(true, std::memory_order_release);
_health_check_cv.notify_one();
if (_health_check_thread && _health_check_thread->joinable()) {
_health_check_thread->join();
_health_check_thread.reset();
}
// Shutdown all processes
std::lock_guard<std::mutex> lock(_pools_mutex);
for (auto& [version, pool] : _process_pools) {
for (auto& process : pool) {
if (process) {
process->shutdown();
}
}
}
_process_pools.clear();
}
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) {
// Read from /proc/{pid}/statm
// Format: size resident shared text lib data dt
std::string statm_path = fmt::format("/proc/{}/statm", pid);
std::ifstream statm_file(statm_path);
if (!statm_file.is_open()) {
return Status::InternalError("Cannot open {}", statm_path);
}
size_t size_pages = 0, rss_pages = 0;
// we only care about RSS, read and ignore the total size field
statm_file >> size_pages >> rss_pages;
if (statm_file.fail()) {
return Status::InternalError("Failed to read {}", statm_path);
}
// Convert pages to bytes
long page_size = sysconf(_SC_PAGESIZE);
*rss_bytes = rss_pages * page_size;
return Status::OK();
}
void PythonServerManager::_refresh_memory_stats() {
std::lock_guard<std::mutex> lock(_pools_mutex);
int64_t total_rss = 0;
for (const auto& [version, pool] : _process_pools) {
for (const auto& process : pool) {
if (!process || !process->is_alive()) continue;
size_t rss_bytes = 0;
Status s = _read_process_memory(process->get_child_pid(), &rss_bytes);
if (s.ok()) {
total_rss += rss_bytes;
} else [[unlikely]] {
LOG(WARNING) << "Failed to read memory info for Python process (pid="
<< process->get_child_pid() << "): " << s.to_string();
}
}
}
_mem_tracker.set_consumption(total_rss);
LOG(INFO) << _mem_tracker.log_usage();
if (config::python_udf_processes_memory_limit_bytes > 0 &&
total_rss > config::python_udf_processes_memory_limit_bytes) {
LOG(WARNING) << "Python UDF process memory usage exceeds limit: rss_bytes=" << total_rss
<< ", limit_bytes=" << config::python_udf_processes_memory_limit_bytes;
}
}
Status PythonServerManager::clear_module_cache(const std::string& location) {
if (location.empty()) {
return Status::InvalidArgument("Empty location for clear_module_cache");
}
std::lock_guard<std::mutex> lock(_pools_mutex);
std::string body = fmt::format(R"({{"location": "{}"}})", location);
int success_count = 0;
int fail_count = 0;
bool has_active_process = false;
for (auto& [version, pool] : _process_pools) {
for (auto& process : pool) {
if (!process || !process->is_alive()) {
continue;
}
has_active_process = true;
try {
auto loc_result = arrow::flight::Location::Parse(process->get_uri());
if (!loc_result.ok()) [[unlikely]] {
fail_count++;
continue;
}
auto client_result = arrow::flight::FlightClient::Connect(*loc_result);
if (!client_result.ok()) [[unlikely]] {
fail_count++;
continue;
}
auto client = std::move(*client_result);
arrow::flight::Action action;
action.type = "clear_module_cache";
action.body = arrow::Buffer::FromString(body);
auto result_stream = client->DoAction(action);
if (!result_stream.ok()) {
fail_count++;
continue;
}
auto result = (*result_stream)->Next();
if (result.ok() && *result) {
success_count++;
} else {
fail_count++;
}
} catch (...) {
fail_count++;
}
}
}
if (!has_active_process) {
return Status::OK();
}
LOG(INFO) << "clear_module_cache completed for location=" << location
<< ", success=" << success_count << ", failed=" << fail_count;
if (fail_count > 0) {
return Status::InternalError(
"clear_module_cache failed for location={}, success={}, failed={}", location,
success_count, fail_count);
}
return Status::OK();
}
// Explicit template instantiation for UDF, UDAF and UDTF clients
template Status PythonServerManager::get_client<PythonUDFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDFClient>* client,
const std::shared_ptr<arrow::Schema>& data_schema);
template Status PythonServerManager::get_client<PythonUDAFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDAFClient>* client,
const std::shared_ptr<arrow::Schema>& data_schema);
template Status PythonServerManager::get_client<PythonUDTFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDTFClient>* client,
const std::shared_ptr<arrow::Schema>& data_schema);
} // namespace doris