blob: 4a560a4066fa3b988e1d6754718dfa8fdada31db [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "udf/python/python_server.h"
#include <butil/fd_utility.h>
#include <dirent.h>
#include <fmt/core.h>
#include <sys/poll.h>
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <memory>
#include "common/config.h"
#include "udf/python/python_udaf_client.h"
#include "udf/python/python_udf_client.h"
#include "udf/python/python_udtf_client.h"
namespace doris {
Status PythonServerManager::init(const std::vector<PythonVersion>& versions) {
std::lock_guard<std::mutex> lock(_pools_mutex);
for (const auto& version : versions) {
if (_pools.find(version) != _pools.end()) continue;
PythonUDFProcessPoolPtr new_pool = std::make_unique<PythonUDFProcessPool>(
version, config::max_python_process_nums, config::min_python_process_nums);
RETURN_IF_ERROR(new_pool->init());
_pools[version] = std::move(new_pool);
}
return Status::OK();
}
template <typename T>
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<T>* client) {
PythonUDFProcessPoolPtr* pool = nullptr;
{
std::lock_guard<std::mutex> lock(_pools_mutex);
if (_pools.find(version) == _pools.end()) {
PythonUDFProcessPoolPtr new_pool = std::make_unique<PythonUDFProcessPool>(
version, config::max_python_process_nums, config::min_python_process_nums);
RETURN_IF_ERROR(new_pool->init());
_pools[version] = std::move(new_pool);
}
pool = &_pools[version];
}
ProcessPtr process;
RETURN_IF_ERROR((*pool)->borrow_process(&process));
RETURN_IF_ERROR(T::create(func_meta, std::move(process), client));
return Status::OK();
}
Status PythonServerManager::fork(PythonUDFProcessPool* pool, ProcessPtr* process) {
DCHECK(pool != nullptr);
const PythonVersion& version = pool->get_python_version();
// e.g. /usr/local/python3.7/bin/python3
std::string python_executable_path = version.get_executable_path();
// e.g. /{DORIS_HOME}/plugins/python_udf/python_server.py
std::string fight_server_path = get_fight_server_path();
// e.g. grpc+unix:///home/doris/output/be/lib/udf/python/python_udf
std::string base_unix_socket_path = get_base_unix_socket_path();
std::vector<std::string> args = {"-u", // unbuffered output
fight_server_path, base_unix_socket_path};
boost::process::environment env = boost::this_process::environment();
boost::process::ipstream child_output; // input stream from child
try {
boost::process::child c(
python_executable_path, args, boost::process::std_out > child_output,
boost::process::env = env,
boost::process::on_exit([](int exit_code, const std::error_code& ec) {
if (ec) {
LOG(WARNING) << "Python UDF server exited with error: " << ec.message();
}
}));
std::string log_line;
std::string full_log;
bool started_successfully = false;
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
const auto timeout = std::chrono::milliseconds(5000);
while (std::chrono::steady_clock::now() - start < timeout) {
if (std::getline(child_output, log_line)) {
full_log += log_line + "\n";
LOG(INFO) << fmt::format("Start python server, log_line: {}, full_log: {}",
log_line, full_log);
if (log_line == PYTHON_SERVER_START_SUCCESS_MSG) {
started_successfully = true;
break;
}
} else {
if (!c.running()) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
if (!started_successfully) {
if (c.running()) {
c.terminate(); // terminate() sends SIGTERM on Unix
c.wait(); // wait for exit to avoid zombie processes
}
std::string error_msg = full_log.empty() ? "No output from Python server" : full_log;
LOG(ERROR) << "Python server start failed:\n" << error_msg;
return Status::InternalError("python server start failed:\n{}", error_msg);
}
*process = std::make_unique<PythonUDFProcess>(std::move(c), std::move(child_output), pool);
} catch (const std::exception& e) {
return Status::InternalError("Failed to start Python UDF server: {}", e.what());
}
return Status::OK();
}
void PythonServerManager::shutdown() {
std::lock_guard lock(_pools_mutex);
for (auto& pool : _pools) {
pool.second->shutdown();
}
_pools.clear();
LOG(INFO) << "Python UDF server manager shutdown successfully";
}
// Explicit template instantiation for UDF, UDAF and UDTF clients
template Status PythonServerManager::get_client<PythonUDFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDFClient>* client);
template Status PythonServerManager::get_client<PythonUDAFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDAFClient>* client);
template Status PythonServerManager::get_client<PythonUDTFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDTFClient>* client);
} // namespace doris