blob: c6c510fe6f5373a45e7602ba7c5fe3fe26ff5ccd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "udf/python/python_server.h"
#include <gtest/gtest.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <filesystem>
#include <fstream>
#include <string>
#include "common/config.h"
#include "common/status.h"
#include "udf/python/python_env.h"
#include "udf/python/python_udf_client.h"
#include "udf/python/python_udf_meta.h"
#include "udf/python/python_udf_runtime.h"
namespace doris {
namespace fs = std::filesystem;
class PythonServerTest : public ::testing::Test {
protected:
std::string test_dir_;
const char* original_doris_home_ = nullptr;
int original_max_python_process_num_ = 0;
void SetUp() override {
test_dir_ = fs::temp_directory_path().string() + "/python_server_test_" +
std::to_string(getpid()) + "_" + std::to_string(rand());
fs::create_directories(test_dir_);
original_doris_home_ = std::getenv("DORIS_HOME");
original_max_python_process_num_ = config::max_python_process_num;
}
void TearDown() override {
// Restore configuration
config::max_python_process_num = original_max_python_process_num_;
if (!test_dir_.empty() && fs::exists(test_dir_)) {
fs::remove_all(test_dir_);
}
if (original_doris_home_) {
setenv("DORIS_HOME", original_doris_home_, 1);
} else {
unsetenv("DORIS_HOME");
}
}
// Create a fake Python script that creates a socket file and keeps running
// Arg: script prints "Python X.Y.Z" for version detection
std::string create_fake_python_with_socket_creation(const std::string& version = "3.9.16") {
std::string bin_dir = test_dir_ + "/bin";
std::string python_path = bin_dir + "/python3";
fs::create_directories(bin_dir);
// Create fake Python script
// Behavior: 1. If the arg is --version, print the version
// 2. Otherwise, create the socket file and wait
std::ofstream ofs(python_path);
ofs << "#!/bin/bash\n";
ofs << "if [ \"$1\" = \"--version\" ]; then\n";
ofs << " echo 'Python " << version << "'\n";
ofs << " exit 0\n";
ofs << "fi\n";
// Extract socket path prefix from args and create the socket file
// Arg format: -u script.py grpc+unix:///tmp/doris_python_udf
ofs << "SOCKET_PREFIX=\"$3\"\n";
ofs << "# Extract path part (strip grpc+unix://)\n";
ofs << "SOCKET_BASE=\"${SOCKET_PREFIX#grpc+unix://}\"\n";
ofs << "SOCKET_FILE=\"${SOCKET_BASE}_$$.sock\"\n";
ofs << "# Create socket file\n";
ofs << "touch \"$SOCKET_FILE\"\n";
ofs << "# Wait to be terminated\n";
ofs << "trap 'rm -f \"$SOCKET_FILE\"; exit 0' TERM INT\n";
ofs << "while true; do sleep 1; done\n";
ofs.close();
fs::permissions(python_path, fs::perms::owner_all);
return python_path;
}
// Set DORIS_HOME and create flight server script directory
void setup_doris_home() {
setenv("DORIS_HOME", test_dir_.c_str(), 1);
std::string plugin_dir = test_dir_ + "/plugins/python_udf";
fs::create_directories(plugin_dir);
// Create an empty python_server.py (won't be executed because we use fake Python)
std::ofstream ofs(plugin_dir + "/python_server.py");
ofs << "# fake server\n";
ofs.close();
}
};
// ============================================================================
// PythonServerManager::instance() - 单例测试
// ============================================================================
TEST_F(PythonServerTest, SingletonReturnsSameInstance) {
PythonServerManager& mgr1 = PythonServerManager::instance();
PythonServerManager& mgr2 = PythonServerManager::instance();
// Verify both calls return the same instance
EXPECT_EQ(&mgr1, &mgr2);
}
// ============================================================================
// PythonServerManager::get_process() - 获取进程测试
// ============================================================================
TEST_F(PythonServerTest, GetProcessFromEmptyPoolReturnsError) {
PythonServerManager mgr;
PythonVersion version("3.9.16", "/fake/path", "/fake/python");
ProcessPtr process;
Status status = mgr.get_process(version, &process);
// Verify: empty pool should return an error with message containing "pool is empty"
EXPECT_FALSE(status.ok());
EXPECT_TRUE(status.to_string().find("pool is empty") != std::string::npos);
EXPECT_EQ(process, nullptr);
}
// ============================================================================
// PythonServerManager::fork() - 进程创建测试
// ============================================================================
TEST_F(PythonServerTest, ForkWithNonExistentPythonReturnsError) {
PythonServerManager mgr;
PythonVersion invalid_version("3.9.16", test_dir_, test_dir_ + "/nonexistent_python");
ProcessPtr process;
Status status = mgr.fork(invalid_version, &process);
// Verify: non-existent Python should return an error
EXPECT_FALSE(status.ok());
EXPECT_EQ(process, nullptr);
}
TEST_F(PythonServerTest, ForkWithMissingFlightServerReturnsError) {
PythonServerManager mgr;
// Set DORIS_HOME to test directory (no flight server script)
setenv("DORIS_HOME", test_dir_.c_str(), 1);
// Create a fake python executable
std::string python_path = test_dir_ + "/bin/python3";
fs::create_directories(test_dir_ + "/bin");
{
std::ofstream ofs(python_path);
ofs << "#!/bin/bash\nexit 1"; // exits immediately
}
fs::permissions(python_path, fs::perms::owner_all);
PythonVersion version("3.9.16", test_dir_, python_path);
ProcessPtr process;
Status status = mgr.fork(version, &process);
// 验证:flight server 脚本不存在,fork 应失败
EXPECT_FALSE(status.ok());
EXPECT_EQ(process, nullptr);
}
TEST_F(PythonServerTest, ForkWithProcessThatExitsImmediatelyReturnsError) {
PythonServerManager mgr;
// 设置 DORIS_HOME
setenv("DORIS_HOME", test_dir_.c_str(), 1);
// Create flight server directory structure
std::string plugin_dir = test_dir_ + "/plugins/python_udf";
fs::create_directories(plugin_dir);
// Create a fake python_server.py (will be executed by Python but exits immediately)
std::string server_path = plugin_dir + "/python_server.py";
{
std::ofstream ofs(server_path);
ofs << "import sys; sys.exit(1)";
}
// Create a fake python executable
std::string python_path = test_dir_ + "/bin/python3";
fs::create_directories(test_dir_ + "/bin");
{
std::ofstream ofs(python_path);
ofs << "#!/bin/bash\nexit 1"; // exits immediately, does not create socket file
}
fs::permissions(python_path, fs::perms::owner_all);
PythonVersion version("3.9.16", test_dir_, python_path);
ProcessPtr process;
Status status = mgr.fork(version, &process);
// Verify: process exits immediately (socket file not created), should return an error
EXPECT_FALSE(status.ok());
// Error message should contain socket-related content
std::string err_msg = status.to_string();
EXPECT_TRUE(err_msg.find("socket") != std::string::npos ||
err_msg.find("start") != std::string::npos);
}
// ============================================================================
// PythonServerManager::ensure_pool_initialized() - 池初始化测试
// ============================================================================
TEST_F(PythonServerTest, EnsurePoolInitializedWithInvalidVersionFails) {
PythonServerManager mgr;
PythonVersion invalid_version("3.99.99", "/non/existent/path", "/non/existent/python");
Status status = mgr.ensure_pool_initialized(invalid_version);
// Verify: invalid version should cause initialization to fail
EXPECT_FALSE(status.ok());
// Error message should indicate all process creations failed
EXPECT_TRUE(status.to_string().find("Failed") != std::string::npos ||
status.to_string().find("failed") != std::string::npos);
}
// ============================================================================
// PythonServerManager::shutdown() - 关闭测试
// ============================================================================
TEST_F(PythonServerTest, ShutdownEmptyManagerDoesNotCrash) {
PythonServerManager mgr;
// Verify: calling shutdown on empty manager does not crash
EXPECT_NO_THROW(mgr.shutdown());
}
TEST_F(PythonServerTest, ShutdownCalledMultipleTimesDoesNotCrash) {
PythonServerManager mgr;
// Verify: calling shutdown multiple times does not crash
EXPECT_NO_THROW({
mgr.shutdown();
mgr.shutdown();
mgr.shutdown();
});
}
TEST_F(PythonServerTest, ShutdownAfterFailedInitializationDoesNotCrash) {
PythonServerManager mgr;
// Try initialization first (expected to fail)
PythonVersion invalid_version("3.99.99", "/bad/path", "/bad/python");
Status status = mgr.ensure_pool_initialized(invalid_version);
EXPECT_FALSE(status.ok());
// Verify: calling shutdown after failed initialization does not crash
EXPECT_NO_THROW(mgr.shutdown());
}
// ============================================================================
// PythonServerManager::get_client() - 获取客户端测试
// ============================================================================
TEST_F(PythonServerTest, GetClientWithInvalidVersionFails) {
PythonServerManager mgr;
PythonVersion invalid_version("3.9.16", "/invalid/path", "/invalid/python");
PythonUDFMeta meta;
meta.name = "test_udf";
meta.symbol = "test_func";
meta.runtime_version = "3.9.16";
meta.type = PythonUDFLoadType::INLINE;
meta.client_type = PythonClientType::UDF;
std::shared_ptr<PythonUDFClient> client;
Status status = mgr.get_client(meta, invalid_version, &client);
// Verify: getting client with invalid version should fail
EXPECT_FALSE(status.ok());
EXPECT_EQ(client, nullptr);
}
// ============================================================================
// 配置测试
// ============================================================================
TEST_F(PythonServerTest, MaxPythonProcessNumConfigIsAccessible) {
// Verify configuration value is accessible and within a valid range
int max_num = config::max_python_process_num;
EXPECT_GE(max_num, 0); // 0 means use number of CPU cores
}
// ============================================================================
// 析构函数测试
// ============================================================================
TEST_F(PythonServerTest, DestructorCleansUpResources) {
// Create and destroy manager to ensure no memory leaks or crashes
{
PythonServerManager mgr;
// Try some operations (they fail but should not affect destructor)
PythonVersion invalid_version("3.9.16", "/bad", "/bad");
ProcessPtr process;
Status status = mgr.fork(invalid_version, &process);
EXPECT_FALSE(status.ok());
}
// If we reach here without crashing, destructor works properly
SUCCEED();
}
// ============================================================================
// 使用假 Python 脚本测试成功路径
// ============================================================================
TEST_F(PythonServerTest, ForkSuccessWithFakePython) {
setup_doris_home();
std::string python_path = create_fake_python_with_socket_creation("3.9.16");
PythonServerManager mgr;
PythonVersion version("3.9.16", test_dir_, python_path);
ProcessPtr process;
Status status = mgr.fork(version, &process);
// Verify fork succeeded
EXPECT_TRUE(status.ok()) << status.to_string();
EXPECT_NE(process, nullptr);
EXPECT_TRUE(process->is_alive());
EXPECT_GT(process->get_child_pid(), 0);
// Verify socket path is correct
std::string uri = process->get_uri();
EXPECT_TRUE(uri.find("grpc+unix://") != std::string::npos);
// Cleanup
process->shutdown();
EXPECT_TRUE(process->is_shutdown());
}
TEST_F(PythonServerTest, EnsurePoolInitializedSuccess) {
setup_doris_home();
std::string python_path = create_fake_python_with_socket_creation("3.9.16");
// Limit process pool to 1 to speed up the test
config::max_python_process_num = 1;
PythonServerManager mgr;
PythonVersion version("3.9.16", test_dir_, python_path);
Status status = mgr.ensure_pool_initialized(version);
// Verify pool initialization succeeded
EXPECT_TRUE(status.ok()) << status.to_string();
// Cleanup
mgr.shutdown();
}
TEST_F(PythonServerTest, EnsurePoolInitializedIdempotent) {
setup_doris_home();
std::string python_path = create_fake_python_with_socket_creation("3.9.16");
config::max_python_process_num = 1;
PythonServerManager mgr;
PythonVersion version("3.9.16", test_dir_, python_path);
// First initialization
Status status1 = mgr.ensure_pool_initialized(version);
EXPECT_TRUE(status1.ok()) << status1.to_string();
// Second initialization should return immediately (version already initialized)
Status status2 = mgr.ensure_pool_initialized(version);
EXPECT_TRUE(status2.ok()) << status2.to_string();
mgr.shutdown();
}
TEST_F(PythonServerTest, GetProcessFromInitializedPool) {
setup_doris_home();
std::string python_path = create_fake_python_with_socket_creation("3.9.16");
config::max_python_process_num = 1;
PythonServerManager mgr;
PythonVersion version("3.9.16", test_dir_, python_path);
// Initialize the pool first
Status init_status = mgr.ensure_pool_initialized(version);
EXPECT_TRUE(init_status.ok()) << init_status.to_string();
// Get a process
ProcessPtr process;
Status status = mgr.get_process(version, &process);
EXPECT_TRUE(status.ok()) << status.to_string();
EXPECT_NE(process, nullptr);
EXPECT_TRUE(process->is_alive());
mgr.shutdown();
}
TEST_F(PythonServerTest, GetProcessLoadBalancing) {
setup_doris_home();
std::string python_path = create_fake_python_with_socket_creation("3.9.16");
// Create a pool with 2 processes
config::max_python_process_num = 2;
PythonServerManager mgr;
PythonVersion version("3.9.16", test_dir_, python_path);
Status init_status = mgr.ensure_pool_initialized(version);
EXPECT_TRUE(init_status.ok()) << init_status.to_string();
// Get multiple processes to verify load balancing
ProcessPtr p1, p2, p3, p4;
EXPECT_TRUE(mgr.get_process(version, &p1).ok());
EXPECT_TRUE(mgr.get_process(version, &p2).ok());
EXPECT_TRUE(mgr.get_process(version, &p3).ok());
EXPECT_TRUE(mgr.get_process(version, &p4).ok());
// With 2 processes, load balancing distributes requests across different processes
// p1 and p2 may be same or different processes
EXPECT_NE(p1, nullptr);
EXPECT_NE(p2, nullptr);
mgr.shutdown();
}
TEST_F(PythonServerTest, ShutdownWithRunningProcesses) {
setup_doris_home();
std::string python_path = create_fake_python_with_socket_creation("3.9.16");
config::max_python_process_num = 2;
PythonServerManager mgr;
PythonVersion version("3.9.16", test_dir_, python_path);
// Initialize the pool
Status init_status = mgr.ensure_pool_initialized(version);
EXPECT_TRUE(init_status.ok()) << init_status.to_string();
// Get a process reference
ProcessPtr process;
EXPECT_TRUE(mgr.get_process(version, &process).ok());
EXPECT_TRUE(process->is_alive());
// Shutdown should terminate all processes
mgr.shutdown();
// Process should be shut down
EXPECT_TRUE(process->is_shutdown());
}
TEST_F(PythonServerTest, MultipleVersionPools) {
setup_doris_home();
// Create two fake Pythons with different versions
std::string python39_path = test_dir_ + "/bin/python3.9";
std::string python310_path = test_dir_ + "/bin/python3.10";
fs::create_directories(test_dir_ + "/bin");
// Python 3.9
{
std::ofstream ofs(python39_path);
ofs << "#!/bin/bash\n";
ofs << "if [ \"$1\" = \"--version\" ]; then echo 'Python 3.9.16'; exit 0; fi\n";
ofs << "SOCKET_BASE=\"${3#grpc+unix://}\"\n";
ofs << "touch \"${SOCKET_BASE}_$$.sock\"\n";
ofs << "trap 'rm -f \"${SOCKET_BASE}_$$.sock\"; exit 0' TERM INT\n";
ofs << "while true; do sleep 1; done\n";
}
fs::permissions(python39_path, fs::perms::owner_all);
// Python 3.10
{
std::ofstream ofs(python310_path);
ofs << "#!/bin/bash\n";
ofs << "if [ \"$1\" = \"--version\" ]; then echo 'Python 3.10.0'; exit 0; fi\n";
ofs << "SOCKET_BASE=\"${3#grpc+unix://}\"\n";
ofs << "touch \"${SOCKET_BASE}_$$.sock\"\n";
ofs << "trap 'rm -f \"${SOCKET_BASE}_$$.sock\"; exit 0' TERM INT\n";
ofs << "while true; do sleep 1; done\n";
}
fs::permissions(python310_path, fs::perms::owner_all);
config::max_python_process_num = 1;
PythonServerManager mgr;
PythonVersion version39("3.9.16", test_dir_, python39_path);
PythonVersion version310("3.10.0", test_dir_, python310_path);
// 初始化两个版本的池
EXPECT_TRUE(mgr.ensure_pool_initialized(version39).ok());
EXPECT_TRUE(mgr.ensure_pool_initialized(version310).ok());
// 从两个池获取进程
ProcessPtr p39, p310;
EXPECT_TRUE(mgr.get_process(version39, &p39).ok());
EXPECT_TRUE(mgr.get_process(version310, &p310).ok());
// 验证是不同的进程
EXPECT_NE(p39->get_child_pid(), p310->get_child_pid());
mgr.shutdown();
}
} // namespace doris