| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "udf/python/python_server.h" |
| |
| #include <gtest/gtest.h> |
| #include <sys/socket.h> |
| #include <sys/un.h> |
| |
| #include <boost/process.hpp> |
| #include <filesystem> |
| #include <fstream> |
| #include <string> |
| #include <vector> |
| |
| #include "common/config.h" |
| #include "common/status.h" |
| #include "udf/python/python_env.h" |
| #include "udf/python/python_udf_client.h" |
| #include "udf/python/python_udf_meta.h" |
| |
| namespace doris { |
| |
| namespace fs = std::filesystem; |
| namespace bp = boost::process; |
| |
| class PythonServerTest : public ::testing::Test { |
| protected: |
| std::string test_dir_; |
| const char* original_doris_home_ = nullptr; |
| int original_max_python_process_num_ = 0; |
| |
| void SetUp() override { |
| test_dir_ = fs::temp_directory_path().string() + "/python_server_test_" + |
| std::to_string(getpid()) + "_" + std::to_string(rand()); |
| fs::create_directories(test_dir_); |
| |
| original_doris_home_ = std::getenv("DORIS_HOME"); |
| original_max_python_process_num_ = config::max_python_process_num; |
| } |
| |
| void TearDown() override { |
| // Restore configuration |
| config::max_python_process_num = original_max_python_process_num_; |
| |
| if (!test_dir_.empty() && fs::exists(test_dir_)) { |
| fs::remove_all(test_dir_); |
| } |
| |
| if (original_doris_home_) { |
| setenv("DORIS_HOME", original_doris_home_, 1); |
| } else { |
| unsetenv("DORIS_HOME"); |
| } |
| } |
| |
| // Create a fake Python script that creates a socket file and keeps running |
| // Arg: script prints "Python X.Y.Z" for version detection |
| std::string create_fake_python_with_socket_creation(const std::string& version = "3.9.16") { |
| std::string bin_dir = test_dir_ + "/bin"; |
| std::string python_path = bin_dir + "/python3"; |
| fs::create_directories(bin_dir); |
| |
| // Create fake Python script |
| // Behavior: 1. If the arg is --version, print the version |
| // 2. Otherwise, create the socket file and wait |
| std::ofstream ofs(python_path); |
| ofs << "#!/bin/bash\n"; |
| ofs << "if [ \"$1\" = \"--version\" ]; then\n"; |
| ofs << " echo 'Python " << version << "'\n"; |
| ofs << " exit 0\n"; |
| ofs << "fi\n"; |
| // Extract socket path prefix from args and create the socket file |
| // Arg format: -u script.py grpc+unix:///tmp/doris_python_udf |
| ofs << "SOCKET_PREFIX=\"$3\"\n"; |
| ofs << "# Extract path part (strip grpc+unix://)\n"; |
| ofs << "SOCKET_BASE=\"${SOCKET_PREFIX#grpc+unix://}\"\n"; |
| ofs << "SOCKET_FILE=\"${SOCKET_BASE}_$$.sock\"\n"; |
| ofs << "# Create socket file\n"; |
| ofs << "touch \"$SOCKET_FILE\"\n"; |
| ofs << "# Wait to be terminated\n"; |
| ofs << "trap 'rm -f \"$SOCKET_FILE\"; exit 0' TERM INT\n"; |
| ofs << "while true; do sleep 1; done\n"; |
| ofs.close(); |
| fs::permissions(python_path, fs::perms::owner_all); |
| |
| return python_path; |
| } |
| |
| std::string create_fake_python_with_delay_and_socket_creation(const std::string& binary_name, |
| const std::string& version, |
| int delay_ms) { |
| std::string bin_dir = test_dir_ + "/bin"; |
| std::string python_path = bin_dir + "/" + binary_name; |
| fs::create_directories(bin_dir); |
| |
| std::ofstream ofs(python_path); |
| ofs << "#!/bin/bash\n"; |
| ofs << "if [ \"$1\" = \"--version\" ]; then\n"; |
| ofs << " echo 'Python " << version << "'\n"; |
| ofs << " exit 0\n"; |
| ofs << "fi\n"; |
| ofs << "sleep " << (delay_ms / 1000.0) << "\n"; |
| ofs << "SOCKET_PREFIX=\"$3\"\n"; |
| ofs << "SOCKET_BASE=\"${SOCKET_PREFIX#grpc+unix://}\"\n"; |
| ofs << "SOCKET_FILE=\"${SOCKET_BASE}_$$.sock\"\n"; |
| ofs << "touch \"$SOCKET_FILE\"\n"; |
| ofs << "trap 'rm -f \"$SOCKET_FILE\"; exit 0' TERM INT\n"; |
| ofs << "while true; do sleep 1; done\n"; |
| ofs.close(); |
| fs::permissions(python_path, fs::perms::owner_all); |
| |
| return python_path; |
| } |
| |
| // Set DORIS_HOME and create flight server script directory |
| void setup_doris_home() { |
| setenv("DORIS_HOME", test_dir_.c_str(), 1); |
| std::string plugin_dir = test_dir_ + "/plugins/python_udf"; |
| fs::create_directories(plugin_dir); |
| // Create an empty python_server.py (won't be executed because we use fake Python) |
| std::ofstream ofs(plugin_dir + "/python_server.py"); |
| ofs << "# fake server\n"; |
| ofs.close(); |
| } |
| |
| ProcessPtr create_sleep_process() { |
| bp::ipstream output_stream; |
| std::string sleep_path = fs::exists("/bin/sleep") ? "/bin/sleep" : "/usr/bin/sleep"; |
| bp::child child(sleep_path, "60", bp::std_out > output_stream, bp::std_err > bp::null); |
| return std::make_shared<PythonUDFProcess>(std::move(child), std::move(output_stream)); |
| } |
| }; |
| |
| // ============================================================================ |
| // PythonServerManager::instance() - singleton test |
| // ============================================================================ |
| |
| TEST_F(PythonServerTest, SingletonReturnsSameInstance) { |
| PythonServerManager& mgr1 = PythonServerManager::instance(); |
| PythonServerManager& mgr2 = PythonServerManager::instance(); |
| |
| // Verify both calls return the same instance |
| EXPECT_EQ(&mgr1, &mgr2); |
| } |
| |
| // ============================================================================ |
| // PythonServerManager::_get_process() - process retrieval test |
| // ============================================================================ |
| |
| TEST_F(PythonServerTest, GetProcessFromEmptyPoolReturnsError) { |
| PythonServerManager mgr; |
| |
| PythonVersion version("3.9.16", "/fake/path", "/fake/python"); |
| mgr.set_process_pool_for_test(version, {}); |
| auto pool_result = mgr._ensure_pool_initialized(version); |
| ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string(); |
| |
| ProcessPtr process; |
| Status status = mgr._get_process(version, pool_result.value(), &process); |
| |
| // Verify: empty pool should return an error before touching process slots. |
| EXPECT_FALSE(status.ok()); |
| EXPECT_TRUE(status.to_string().find("pool is empty") != std::string::npos); |
| EXPECT_EQ(process, nullptr); |
| } |
| |
| // ============================================================================ |
| // PythonServerManager::fork() - process creation test |
| // ============================================================================ |
| |
| TEST_F(PythonServerTest, ForkWithNonExistentPythonReturnsError) { |
| PythonServerManager mgr; |
| |
| PythonVersion invalid_version("3.9.16", test_dir_, test_dir_ + "/nonexistent_python"); |
| |
| ProcessPtr process; |
| Status status = mgr.fork(invalid_version, &process); |
| |
| // Verify: non-existent Python should return an error |
| EXPECT_FALSE(status.ok()); |
| EXPECT_EQ(process, nullptr); |
| } |
| |
| TEST_F(PythonServerTest, ForkWithMissingFlightServerReturnsError) { |
| PythonServerManager mgr; |
| |
| // Set DORIS_HOME to test directory (no flight server script) |
| setenv("DORIS_HOME", test_dir_.c_str(), 1); |
| |
| // Create a fake python executable |
| std::string python_path = test_dir_ + "/bin/python3"; |
| fs::create_directories(test_dir_ + "/bin"); |
| { |
| std::ofstream ofs(python_path); |
| ofs << "#!/bin/bash\nexit 1"; // exits immediately |
| } |
| fs::permissions(python_path, fs::perms::owner_all); |
| |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| ProcessPtr process; |
| Status status = mgr.fork(version, &process); |
| |
| // Verify: when the flight server script does not exist, fork should fail |
| EXPECT_FALSE(status.ok()); |
| EXPECT_EQ(process, nullptr); |
| } |
| |
| TEST_F(PythonServerTest, ForkWithProcessThatExitsImmediatelyReturnsError) { |
| PythonServerManager mgr; |
| |
| // Set DORIS_HOME |
| setenv("DORIS_HOME", test_dir_.c_str(), 1); |
| |
| // Create flight server directory structure |
| std::string plugin_dir = test_dir_ + "/plugins/python_udf"; |
| fs::create_directories(plugin_dir); |
| |
| // Create a fake python_server.py (will be executed by Python but exits immediately) |
| std::string server_path = plugin_dir + "/python_server.py"; |
| { |
| std::ofstream ofs(server_path); |
| ofs << "import sys; sys.exit(1)"; |
| } |
| |
| // Create a fake python executable |
| std::string python_path = test_dir_ + "/bin/python3"; |
| fs::create_directories(test_dir_ + "/bin"); |
| { |
| std::ofstream ofs(python_path); |
| ofs << "#!/bin/bash\nexit 1"; // exits immediately, does not create socket file |
| } |
| fs::permissions(python_path, fs::perms::owner_all); |
| |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| ProcessPtr process; |
| Status status = mgr.fork(version, &process); |
| |
| // Verify: process exits immediately (socket file not created), should return an error |
| EXPECT_FALSE(status.ok()); |
| // Error message should contain socket-related content |
| std::string err_msg = status.to_string(); |
| EXPECT_TRUE(err_msg.find("socket") != std::string::npos || |
| err_msg.find("start") != std::string::npos); |
| } |
| |
| // ============================================================================ |
| // PythonServerManager::_ensure_pool_initialized() - pool initialization test |
| // ============================================================================ |
| |
| TEST_F(PythonServerTest, EnsurePoolInitializedWithInvalidVersionFails) { |
| PythonServerManager mgr; |
| |
| PythonVersion invalid_version("3.99.99", "/non/existent/path", "/non/existent/python"); |
| |
| auto result = mgr._ensure_pool_initialized(invalid_version); |
| |
| // Verify: invalid version should cause initialization to fail |
| EXPECT_FALSE(result.has_value()); |
| // Error message should indicate all process creations failed |
| EXPECT_TRUE(result.error().to_string().find("Failed") != std::string::npos || |
| result.error().to_string().find("failed") != std::string::npos); |
| } |
| |
| // ============================================================================ |
| // PythonServerManager::shutdown() - shutdown test |
| // ============================================================================ |
| |
| TEST_F(PythonServerTest, ShutdownEmptyManagerDoesNotCrash) { |
| PythonServerManager mgr; |
| |
| // Verify: calling shutdown on empty manager does not crash |
| EXPECT_NO_THROW(mgr.shutdown()); |
| } |
| |
| TEST_F(PythonServerTest, ShutdownCalledMultipleTimesDoesNotCrash) { |
| PythonServerManager mgr; |
| |
| // Verify: calling shutdown multiple times does not crash |
| EXPECT_NO_THROW({ |
| mgr.shutdown(); |
| mgr.shutdown(); |
| mgr.shutdown(); |
| }); |
| } |
| |
| TEST_F(PythonServerTest, ShutdownAfterFailedInitializationDoesNotCrash) { |
| PythonServerManager mgr; |
| |
| // Try initialization first (expected to fail) |
| PythonVersion invalid_version("3.99.99", "/bad/path", "/bad/python"); |
| auto result = mgr._ensure_pool_initialized(invalid_version); |
| EXPECT_FALSE(result.has_value()); |
| |
| // Verify: calling shutdown after failed initialization does not crash |
| EXPECT_NO_THROW(mgr.shutdown()); |
| } |
| |
| TEST_F(PythonServerTest, ClearUdafStateCacheWithoutProcessesIsNoOp) { |
| PythonServerManager mgr; |
| |
| EXPECT_NO_THROW(mgr.clear_udaf_state_cache(12345)); |
| } |
| |
| TEST_F(PythonServerTest, ClearModuleCacheWithoutProcessesIsNoOp) { |
| PythonServerManager mgr; |
| |
| auto status = mgr.clear_module_cache("/tmp/python_udf_cache"); |
| EXPECT_TRUE(status.ok()) << status.to_string(); |
| } |
| |
| TEST_F(PythonServerTest, BroadcastActionWithInvalidProcessUriReturnsError) { |
| PythonServerManager mgr; |
| PythonVersion version("3.9.16", test_dir_, test_dir_ + "/bin/python3"); |
| ProcessPtr process = create_sleep_process(); |
| ASSERT_NE(process, nullptr); |
| ASSERT_TRUE(process->is_alive()); |
| process->set_uri_for_test("invalid-python-flight-uri"); |
| |
| mgr.set_process_pool_for_test(version, {process}); |
| auto status = mgr.broadcast_action_to_processes_for_test( |
| "clear_udaf_state_cache", R"({"function_id": 12345})", "function_id=12345"); |
| |
| EXPECT_FALSE(status.ok()); |
| EXPECT_NE(status.to_string().find("clear_udaf_state_cache failed for function_id=12345"), |
| std::string::npos); |
| EXPECT_NE(status.to_string().find("success=0, failed=1"), std::string::npos); |
| |
| mgr.shutdown(); |
| } |
| |
| // ============================================================================ |
| // PythonServerManager::get_client() - client retrieval test |
| // ============================================================================ |
| |
| TEST_F(PythonServerTest, GetClientWithInvalidVersionFails) { |
| PythonServerManager mgr; |
| |
| PythonVersion invalid_version("3.9.16", "/invalid/path", "/invalid/python"); |
| PythonUDFMeta meta; |
| meta.name = "test_udf"; |
| meta.symbol = "test_func"; |
| meta.runtime_version = "3.9.16"; |
| meta.type = PythonUDFLoadType::INLINE; |
| meta.client_type = PythonClientType::UDF; |
| |
| std::shared_ptr<PythonUDFClient> client; |
| Status status = mgr.get_client(meta, invalid_version, &client); |
| |
| // Verify: getting client with invalid version should fail |
| EXPECT_FALSE(status.ok()); |
| EXPECT_EQ(client, nullptr); |
| } |
| |
| // ============================================================================ |
| // configuration test |
| // ============================================================================ |
| |
| TEST_F(PythonServerTest, MaxPythonProcessNumConfigIsAccessible) { |
| // Verify configuration value is accessible and within a valid range |
| int max_num = config::max_python_process_num; |
| EXPECT_GE(max_num, 0); // 0 means use number of CPU cores |
| } |
| |
| // ============================================================================ |
| // destructor test |
| // ============================================================================ |
| |
| TEST_F(PythonServerTest, DestructorCleansUpResources) { |
| // Create and destroy manager to ensure no memory leaks or crashes |
| { |
| PythonServerManager mgr; |
| // Try some operations (they fail but should not affect destructor) |
| PythonVersion invalid_version("3.9.16", "/bad", "/bad"); |
| ProcessPtr process; |
| Status status = mgr.fork(invalid_version, &process); |
| EXPECT_FALSE(status.ok()); |
| } |
| // If we reach here without crashing, destructor works properly |
| SUCCEED(); |
| } |
| |
| // ============================================================================ |
| // success-path test using a fake Python script |
| // ============================================================================ |
| |
| TEST_F(PythonServerTest, ForkSuccessWithFakePython) { |
| setup_doris_home(); |
| std::string python_path = create_fake_python_with_socket_creation("3.9.16"); |
| |
| PythonServerManager mgr; |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| ProcessPtr process; |
| Status status = mgr.fork(version, &process); |
| |
| // Verify fork succeeded |
| EXPECT_TRUE(status.ok()) << status.to_string(); |
| EXPECT_NE(process, nullptr); |
| EXPECT_TRUE(process->is_alive()); |
| EXPECT_GT(process->get_child_pid(), 0); |
| |
| // Verify socket path is correct |
| std::string uri = process->get_uri(); |
| EXPECT_TRUE(uri.find("grpc+unix://") != std::string::npos); |
| |
| // Cleanup |
| process->shutdown(); |
| EXPECT_TRUE(process->is_shutdown()); |
| } |
| |
| TEST_F(PythonServerTest, EnsurePoolInitializedSuccess) { |
| setup_doris_home(); |
| std::string python_path = create_fake_python_with_socket_creation("3.9.16"); |
| |
| // Limit process pool to 1 to speed up the test |
| config::max_python_process_num = 1; |
| |
| PythonServerManager mgr; |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| auto result = mgr._ensure_pool_initialized(version); |
| |
| // Verify pool initialization succeeded |
| EXPECT_TRUE(result.has_value()) << result.error().to_string(); |
| |
| // Cleanup |
| mgr.shutdown(); |
| } |
| |
| TEST_F(PythonServerTest, EnsurePoolInitializedLogsProgressWhileWaitingForSlowProcess) { |
| setup_doris_home(); |
| std::string python_path = |
| create_fake_python_with_delay_and_socket_creation("python3.delayed", "3.9.16", 200); |
| |
| config::max_python_process_num = 1; |
| |
| PythonServerManager mgr; |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| auto result = mgr._ensure_pool_initialized(version); |
| |
| EXPECT_TRUE(result.has_value()) << result.error().to_string(); |
| |
| mgr.shutdown(); |
| } |
| |
| TEST_F(PythonServerTest, EnsurePoolInitializedIdempotent) { |
| setup_doris_home(); |
| std::string python_path = create_fake_python_with_socket_creation("3.9.16"); |
| |
| config::max_python_process_num = 1; |
| |
| PythonServerManager mgr; |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| // First initialization |
| auto result1 = mgr._ensure_pool_initialized(version); |
| EXPECT_TRUE(result1.has_value()) << result1.error().to_string(); |
| |
| // Second initialization should return immediately (version already initialized) |
| auto result2 = mgr._ensure_pool_initialized(version); |
| EXPECT_TRUE(result2.has_value()) << result2.error().to_string(); |
| |
| mgr.shutdown(); |
| } |
| |
| TEST_F(PythonServerTest, GetProcessFromInitializedPool) { |
| setup_doris_home(); |
| std::string python_path = create_fake_python_with_socket_creation("3.9.16"); |
| |
| config::max_python_process_num = 1; |
| |
| PythonServerManager mgr; |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| // Initialize the pool first |
| auto init_result = mgr._ensure_pool_initialized(version); |
| EXPECT_TRUE(init_result.has_value()) << init_result.error().to_string(); |
| |
| // Get a process |
| ProcessPtr process; |
| Status status = mgr._get_process(version, init_result.value(), &process); |
| |
| EXPECT_TRUE(status.ok()) << status.to_string(); |
| EXPECT_NE(process, nullptr); |
| EXPECT_TRUE(process->is_alive()); |
| |
| mgr.shutdown(); |
| } |
| |
| TEST_F(PythonServerTest, GetProcessRecreatesDeadProcessWhenNoAliveProcess) { |
| setup_doris_home(); |
| std::string python_path = create_fake_python_with_socket_creation("3.9.16"); |
| |
| config::max_python_process_num = 1; |
| |
| PythonServerManager mgr; |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| auto pool_result = mgr._ensure_pool_initialized(version); |
| ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string(); |
| |
| ProcessPtr first_process; |
| ASSERT_TRUE(mgr._get_process(version, pool_result.value(), &first_process).ok()); |
| ASSERT_NE(first_process, nullptr); |
| ASSERT_TRUE(first_process->is_alive()); |
| pid_t first_pid = first_process->get_child_pid(); |
| |
| first_process->shutdown(); |
| ASSERT_FALSE(first_process->is_alive()); |
| |
| ProcessPtr replacement; |
| Status status = mgr._get_process(version, pool_result.value(), &replacement); |
| |
| EXPECT_TRUE(status.ok()) << status.to_string(); |
| ASSERT_NE(replacement, nullptr); |
| EXPECT_TRUE(replacement->is_alive()); |
| EXPECT_NE(replacement->get_child_pid(), first_pid); |
| |
| mgr.shutdown(); |
| } |
| |
| TEST_F(PythonServerTest, GetProcessSkipsDeadProcessWhenAliveProcessExists) { |
| setup_doris_home(); |
| std::string python_path = create_fake_python_with_socket_creation("3.9.16"); |
| |
| PythonServerManager mgr; |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| ProcessPtr alive_process; |
| ASSERT_TRUE(mgr.fork(version, &alive_process).ok()); |
| ASSERT_NE(alive_process, nullptr); |
| ASSERT_TRUE(alive_process->is_alive()); |
| |
| ProcessPtr dead_process; |
| ASSERT_TRUE(mgr.fork(version, &dead_process).ok()); |
| ASSERT_NE(dead_process, nullptr); |
| pid_t dead_pid = dead_process->get_child_pid(); |
| dead_process->shutdown(); |
| ASSERT_FALSE(dead_process->is_alive()); |
| |
| mgr.set_process_pool_for_test(version, {alive_process, dead_process}); |
| auto pool_result = mgr._ensure_pool_initialized(version); |
| ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string(); |
| |
| ProcessPtr selected; |
| Status status = mgr._get_process(version, pool_result.value(), &selected); |
| |
| EXPECT_TRUE(status.ok()) << status.to_string(); |
| EXPECT_EQ(selected, alive_process); |
| EXPECT_FALSE(mgr.process_pool_for_test(version)[1]->is_alive()); |
| EXPECT_EQ(mgr.process_pool_for_test(version)[1]->get_child_pid(), dead_pid); |
| |
| mgr.shutdown(); |
| } |
| |
| TEST_F(PythonServerTest, GetProcessLoadBalancing) { |
| setup_doris_home(); |
| std::string python_path = create_fake_python_with_socket_creation("3.9.16"); |
| |
| // Create a pool with 2 processes |
| config::max_python_process_num = 2; |
| |
| PythonServerManager mgr; |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| auto init_result = mgr._ensure_pool_initialized(version); |
| EXPECT_TRUE(init_result.has_value()) << init_result.error().to_string(); |
| |
| // Get multiple processes to verify load balancing |
| ProcessPtr p1, p2, p3, p4; |
| EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p1).ok()); |
| EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p2).ok()); |
| EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p3).ok()); |
| EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p4).ok()); |
| |
| // With 2 processes, load balancing distributes requests across different processes |
| // p1 and p2 may be same or different processes |
| EXPECT_NE(p1, nullptr); |
| EXPECT_NE(p2, nullptr); |
| |
| mgr.shutdown(); |
| } |
| |
| TEST_F(PythonServerTest, ShutdownWithRunningProcesses) { |
| setup_doris_home(); |
| std::string python_path = create_fake_python_with_socket_creation("3.9.16"); |
| |
| config::max_python_process_num = 2; |
| |
| PythonServerManager mgr; |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| // Initialize the pool |
| auto init_result = mgr._ensure_pool_initialized(version); |
| EXPECT_TRUE(init_result.has_value()) << init_result.error().to_string(); |
| |
| // Get a process reference |
| ProcessPtr process; |
| EXPECT_TRUE(mgr._get_process(version, init_result.value(), &process).ok()); |
| EXPECT_TRUE(process->is_alive()); |
| |
| // Shutdown should terminate all processes |
| mgr.shutdown(); |
| |
| // Process should be shut down |
| EXPECT_TRUE(process->is_shutdown()); |
| } |
| |
| TEST_F(PythonServerTest, MultipleVersionPools) { |
| setup_doris_home(); |
| |
| // Create two fake Pythons with different versions |
| std::string python39_path = test_dir_ + "/bin/python3.9"; |
| std::string python310_path = test_dir_ + "/bin/python3.10"; |
| fs::create_directories(test_dir_ + "/bin"); |
| |
| // Python 3.9 |
| { |
| std::ofstream ofs(python39_path); |
| ofs << "#!/bin/bash\n"; |
| ofs << "if [ \"$1\" = \"--version\" ]; then echo 'Python 3.9.16'; exit 0; fi\n"; |
| ofs << "SOCKET_BASE=\"${3#grpc+unix://}\"\n"; |
| ofs << "touch \"${SOCKET_BASE}_$$.sock\"\n"; |
| ofs << "trap 'rm -f \"${SOCKET_BASE}_$$.sock\"; exit 0' TERM INT\n"; |
| ofs << "while true; do sleep 1; done\n"; |
| } |
| fs::permissions(python39_path, fs::perms::owner_all); |
| |
| // Python 3.10 |
| { |
| std::ofstream ofs(python310_path); |
| ofs << "#!/bin/bash\n"; |
| ofs << "if [ \"$1\" = \"--version\" ]; then echo 'Python 3.10.0'; exit 0; fi\n"; |
| ofs << "SOCKET_BASE=\"${3#grpc+unix://}\"\n"; |
| ofs << "touch \"${SOCKET_BASE}_$$.sock\"\n"; |
| ofs << "trap 'rm -f \"${SOCKET_BASE}_$$.sock\"; exit 0' TERM INT\n"; |
| ofs << "while true; do sleep 1; done\n"; |
| } |
| fs::permissions(python310_path, fs::perms::owner_all); |
| |
| config::max_python_process_num = 1; |
| |
| PythonServerManager mgr; |
| PythonVersion version39("3.9.16", test_dir_, python39_path); |
| PythonVersion version310("3.10.0", test_dir_, python310_path); |
| |
| // Initialize pools for two versions |
| auto pool39_result = mgr._ensure_pool_initialized(version39); |
| auto pool310_result = mgr._ensure_pool_initialized(version310); |
| EXPECT_TRUE(pool39_result.has_value()) << pool39_result.error().to_string(); |
| EXPECT_TRUE(pool310_result.has_value()) << pool310_result.error().to_string(); |
| |
| // Retrieve processes from both pools |
| ProcessPtr p39, p310; |
| EXPECT_TRUE(mgr._get_process(version39, pool39_result.value(), &p39).ok()); |
| EXPECT_TRUE(mgr._get_process(version310, pool310_result.value(), &p310).ok()); |
| |
| // Verify they are different processes |
| EXPECT_NE(p39->get_child_pid(), p310->get_child_pid()); |
| |
| mgr.shutdown(); |
| } |
| |
| TEST_F(PythonServerTest, EnsurePoolInitializedForDifferentVersionsDoesNotShareVersionLock) { |
| setup_doris_home(); |
| |
| config::max_python_process_num = 1; |
| |
| std::string python39_path = |
| create_fake_python_with_delay_and_socket_creation("python3.9", "3.9.16", 1200); |
| std::string python310_path = |
| create_fake_python_with_delay_and_socket_creation("python3.10", "3.10.0", 1200); |
| |
| PythonServerManager mgr; |
| PythonVersion version39("3.9.16", test_dir_, python39_path); |
| PythonVersion version310("3.10.0", test_dir_, python310_path); |
| |
| auto start = std::chrono::steady_clock::now(); |
| auto future39 = std::async(std::launch::async, |
| [&]() { return mgr._ensure_pool_initialized(version39); }); |
| auto future310 = std::async(std::launch::async, |
| [&]() { return mgr._ensure_pool_initialized(version310); }); |
| |
| auto result39 = future39.get(); |
| auto result310 = future310.get(); |
| auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>( |
| std::chrono::steady_clock::now() - start); |
| |
| EXPECT_TRUE(result39.has_value()) << result39.error().to_string(); |
| EXPECT_TRUE(result310.has_value()) << result310.error().to_string(); |
| // If both versions still contended on one manager-wide lock, the elapsed time would |
| // be close to two serialized 1.2s startups instead of a single startup window. |
| EXPECT_LT(elapsed.count(), 2200); |
| |
| mgr.shutdown(); |
| } |
| |
| // ============================================================================ |
| // PythonServerManager::_check_and_recreate_processes() - health-check recreation test |
| // ============================================================================ |
| |
| TEST_F(PythonServerTest, CheckAndRecreateProcessesRecreatesDeadProcess) { |
| setup_doris_home(); |
| std::string python_path = create_fake_python_with_socket_creation("3.9.16"); |
| |
| PythonServerManager mgr; |
| PythonVersion version("3.9.16", test_dir_, python_path); |
| |
| ProcessPtr alive_process; |
| ASSERT_TRUE(mgr.fork(version, &alive_process).ok()); |
| ASSERT_NE(alive_process, nullptr); |
| ASSERT_TRUE(alive_process->is_alive()); |
| |
| ProcessPtr dead_process; |
| ASSERT_TRUE(mgr.fork(version, &dead_process).ok()); |
| ASSERT_NE(dead_process, nullptr); |
| pid_t dead_pid_before = dead_process->get_child_pid(); |
| dead_process->shutdown(); |
| ASSERT_FALSE(dead_process->is_alive()); |
| |
| mgr.set_process_pool_for_test(version, {alive_process, dead_process, nullptr}); |
| |
| mgr.check_and_recreate_processes_for_test(); |
| |
| ASSERT_EQ(mgr.process_pool_for_test(version).size(), 3); |
| EXPECT_EQ(mgr.process_pool_for_test(version)[0], alive_process); |
| EXPECT_EQ(mgr.process_pool_for_test(version)[2], nullptr); |
| |
| ProcessPtr recreated = mgr.process_pool_for_test(version)[1]; |
| ASSERT_NE(recreated, nullptr); |
| EXPECT_TRUE(recreated->is_alive()); |
| EXPECT_NE(recreated->get_child_pid(), dead_pid_before); |
| |
| mgr.shutdown(); |
| } |
| |
| TEST_F(PythonServerTest, CheckAndRecreateProcessesErasesDeadProcessWhenRecreateFails) { |
| setup_doris_home(); |
| std::string python_path = create_fake_python_with_socket_creation("3.9.16"); |
| |
| PythonServerManager mgr; |
| PythonVersion live_version("3.9.16", test_dir_, python_path); |
| |
| ProcessPtr dead_process_1; |
| ASSERT_TRUE(mgr.fork(live_version, &dead_process_1).ok()); |
| ASSERT_NE(dead_process_1, nullptr); |
| dead_process_1->shutdown(); |
| ASSERT_FALSE(dead_process_1->is_alive()); |
| |
| ProcessPtr dead_process_2; |
| ASSERT_TRUE(mgr.fork(live_version, &dead_process_2).ok()); |
| ASSERT_NE(dead_process_2, nullptr); |
| dead_process_2->shutdown(); |
| ASSERT_FALSE(dead_process_2->is_alive()); |
| |
| PythonVersion invalid_version("3.9.16", test_dir_, test_dir_ + "/bin/nonexistent_python"); |
| mgr.set_process_pool_for_test(invalid_version, {dead_process_1, dead_process_2}); |
| |
| mgr.check_and_recreate_processes_for_test(); |
| |
| EXPECT_TRUE(mgr.process_pool_for_test(invalid_version).empty()); |
| |
| mgr.shutdown(); |
| } |
| |
| } // namespace doris |