blob: d54c2f473b49ce0a8b5cd7d51d97063f31a30de2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/user_function_cache.h"
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <glog/logging.h>
#include <minizip/unzip.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <atomic>
#include <cstdint>
#include <memory>
#include <ostream>
#include <regex>
#include <utility>
#include <vector>
#include "cloud/config.h"
#include "common/config.h"
#include "common/factory_creator.h"
#include "common/status.h"
#include "http/http_client.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
#include "runtime/plugin/cloud_plugin_downloader.h"
#include "util/defer_op.h"
#include "util/dynamic_util.h"
#include "util/md5.h"
#include "util/string_util.h"
namespace doris {
static const int kLibShardNum = 128;
// function cache entry, store information for
struct UserFunctionCacheEntry {
ENABLE_FACTORY_CREATOR(UserFunctionCacheEntry);
UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_,
LibType type)
: function_id(fid_), checksum(checksum_), lib_file(lib_file_), type(type) {}
~UserFunctionCacheEntry();
std::string debug_string() {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,
" the info of UserFunctionCacheEntry save in BE, function_id:{}, "
"checksum:{}, lib_file:{}, is_downloaded:{}. ",
function_id, checksum, lib_file, is_downloaded);
return fmt::to_string(error_msg);
}
int64_t function_id = 0;
// used to check if this library is valid.
std::string checksum;
// library file
std::string lib_file;
// make it atomic variable instead of holding a lock
std::atomic<bool> is_loaded {false};
// Set to true when this library is not needed.
// e.g. deleting some unused library to re
std::atomic<bool> should_delete_library {false};
// lock to make sure only one can load this cache
std::mutex load_lock;
// To reduce cache lock held time, cache entry is
// added to cache map before library is downloaded.
// And this is used to indicate whether library is downloaded.
bool is_downloaded = false;
// Indicate if the zip file is unziped.
bool is_unziped = false;
// used to lookup a symbol
void* lib_handle = nullptr;
// from symbol_name to function pointer
std::unordered_map<std::string, void*> fptr_map;
LibType type;
};
UserFunctionCacheEntry::~UserFunctionCacheEntry() {
// close lib_handle if it was opened
if (lib_handle != nullptr) {
dynamic_close(lib_handle);
lib_handle = nullptr;
}
// delete library file if should_delete_library is set
if (should_delete_library.load()) {
unlink(lib_file.c_str());
}
}
UserFunctionCache::UserFunctionCache() = default;
UserFunctionCache::~UserFunctionCache() {
std::lock_guard<std::mutex> l(_cache_lock);
auto it = _entry_map.begin();
while (it != _entry_map.end()) {
auto entry = it->second;
it = _entry_map.erase(it);
}
}
UserFunctionCache* UserFunctionCache::instance() {
return ExecEnv::GetInstance()->user_function_cache();
}
Status UserFunctionCache::init(const std::string& lib_dir) {
#ifndef BE_TEST
// _lib_dir may be reused between unit tests
DCHECK(_lib_dir.empty()) << _lib_dir;
#endif
_lib_dir = lib_dir;
// 1. dynamic open current process
RETURN_IF_ERROR(dynamic_open(nullptr, &_current_process_handle));
// 2. load all cached
RETURN_IF_ERROR(_load_cached_lib());
return Status::OK();
}
Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std::string& file) {
LibType lib_type;
if (ends_with(file, ".so")) {
lib_type = LibType::SO;
} else if (ends_with(file, ".jar")) {
lib_type = LibType::JAR;
} else if (ends_with(file, ".zip") && _check_cache_is_python_udf(dir, file)) {
lib_type = LibType::PY_ZIP;
} else {
return Status::InternalError(
"unknown library file format. the file type is not end with xxx.jar or xxx.so"
" or xxx.zip : " +
file);
}
std::vector<std::string> split_parts = _split_string_by_checksum(file);
if (split_parts.size() != 3 && split_parts.size() != 4) {
return Status::InternalError(
"user function's name should be function_id.checksum[.file_name].file_type, now "
"the all split parts are by delimiter(.): " +
file);
}
int64_t function_id = std::stol(split_parts[0]);
std::string checksum = split_parts[1];
auto it = _entry_map.find(function_id);
if (it != _entry_map.end()) {
LOG(WARNING) << "meet a same function id user function library, function_id=" << function_id
<< ", one_checksum=" << checksum
<< ", other_checksum info: = " << it->second->debug_string();
return Status::InternalError("duplicate function id");
}
// create a cache entry and put it into entry map
std::shared_ptr<UserFunctionCacheEntry> entry = UserFunctionCacheEntry::create_shared(
function_id, checksum, dir + "/" + file, lib_type);
entry->is_downloaded = true;
_entry_map[function_id] = entry;
return Status::OK();
}
Status UserFunctionCache::_load_cached_lib() {
// create library directory if not exist
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(_lib_dir));
for (int i = 0; i < kLibShardNum; ++i) {
std::string sub_dir = _lib_dir + "/" + std::to_string(i);
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(sub_dir));
auto scan_cb = [this, &sub_dir](const io::FileInfo& file) {
if (!file.is_file) {
return true;
}
auto st = _load_entry_from_lib(sub_dir, file.file_name);
if (!st.ok()) {
LOG(WARNING) << "load a library failed, dir=" << sub_dir
<< ", file=" << file.file_name << ": " << st.to_string();
}
return true;
};
RETURN_IF_ERROR(io::global_local_filesystem()->iterate_directory(sub_dir, scan_cb));
}
return Status::OK();
}
Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
const std::string& checksum,
std::shared_ptr<UserFunctionCacheEntry>& output_entry,
LibType type) {
std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
std::string file_name = _get_file_name_from_url(url);
{
std::lock_guard<std::mutex> l(_cache_lock);
auto it = _entry_map.find(fid);
if (it != _entry_map.end()) {
entry = it->second;
} else {
entry = UserFunctionCacheEntry::create_shared(
fid, checksum, _make_lib_file(fid, checksum, type, file_name), type);
_entry_map.emplace(fid, entry);
}
}
auto st = _load_cache_entry(url, entry);
if (!st.ok()) {
LOG(WARNING) << "fail to load cache entry, fid=" << fid << " " << file_name << " " << url;
// if we load a cache entry failed, I think we should delete this entry cache
// even if this cache was valid before.
_destroy_cache_entry(entry);
return st;
}
output_entry = entry;
return Status::OK();
}
void UserFunctionCache::_destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry) {
// 1. we remove cache entry from entry map
std::lock_guard<std::mutex> l(_cache_lock);
// set should delete flag to true, so that the jar file will be removed when
// the entry is removed from map, and deconstruct method is called.
entry->should_delete_library.store(true);
_entry_map.erase(entry->function_id);
}
Status UserFunctionCache::_load_cache_entry(const std::string& url,
std::shared_ptr<UserFunctionCacheEntry> entry) {
if (entry->is_loaded.load()) {
return Status::OK();
}
std::unique_lock<std::mutex> l(entry->load_lock);
if (!entry->is_downloaded) {
RETURN_IF_ERROR(_download_lib(url, entry));
}
if (!entry->is_unziped && entry->type == LibType::PY_ZIP) {
RETURN_IF_ERROR(_unzip_lib(entry->lib_file));
entry->lib_file = entry->lib_file.substr(0, entry->lib_file.size() - 4);
entry->is_unziped = true;
}
if (entry->type == LibType::SO) {
RETURN_IF_ERROR(_load_cache_entry_internal(entry));
} else if (entry->type != LibType::JAR && entry->type != LibType::PY_ZIP) {
return Status::InvalidArgument(
"Unsupported lib type! Make sure your lib type is one of 'so' and 'jar' and "
"python 'zip'!");
}
return Status::OK();
}
Status UserFunctionCache::_check_cache_is_python_udf(const std::string& dir,
const std::string& file) {
const std::string& full_path = dir + "/" + file;
RETURN_IF_ERROR(_unzip_lib(full_path));
std::string unzip_dir = full_path.substr(0, full_path.size() - 4);
bool has_python_file = false;
auto scan_cb = [&has_python_file](const io::FileInfo& file) {
if (file.is_file && ends_with(file.file_name, ".py")) {
has_python_file = true;
return false; // Stop iteration once we find a Python file
}
return true;
};
RETURN_IF_ERROR(io::global_local_filesystem()->iterate_directory(unzip_dir, scan_cb));
if (!has_python_file) {
return Status::InternalError("No Python file found in the unzipped directory.");
}
return Status::OK();
}
Status UserFunctionCache::_unzip_lib(const std::string& zip_file) {
std::string unzip_dir = zip_file.substr(0, zip_file.size() - 4);
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(unzip_dir));
unzFile zip_file_handle = unzOpen(zip_file.c_str());
if (zip_file_handle == nullptr) {
return Status::InternalError("Failed to open zip file: " + zip_file);
}
Defer defer([&] { unzClose(zip_file_handle); });
unz_global_info global_info;
if (unzGetGlobalInfo(zip_file_handle, &global_info) != UNZ_OK) {
return Status::InternalError("Failed to get global info from zip file: " + zip_file);
}
for (uLong i = 0; i < global_info.number_entry; ++i) {
unz_file_info file_info;
char filename[256];
if (unzGetCurrentFileInfo(zip_file_handle, &file_info, filename, sizeof(filename), nullptr,
0, nullptr, 0) != UNZ_OK) {
return Status::InternalError("Failed to get file info from zip file: " + zip_file);
}
if (std::string(filename).find("__MACOSX") != std::string::npos) {
if ((i + 1) < global_info.number_entry) {
if (unzGoToNextFile(zip_file_handle) != UNZ_OK) {
return Status::InternalError("Failed to go to next file in zip: " + zip_file);
}
}
continue;
}
std::string full_filename = unzip_dir + "/" + filename;
if (full_filename.length() > PATH_MAX) {
return Status::InternalError(
fmt::format("File path {}... is too long, maximum path length is {}",
full_filename.substr(0, 50), PATH_MAX));
}
if (filename[strlen(filename) - 1] == '/') {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(full_filename));
} else {
if (unzOpenCurrentFile(zip_file_handle) != UNZ_OK) {
return Status::InternalError("Failed to open file in zip: " +
std::string(filename));
}
FILE* out = fopen(full_filename.c_str(), "wb");
if (out == nullptr) {
unzCloseCurrentFile(zip_file_handle);
return Status::InternalError("Failed to create file: " + full_filename);
}
char buffer[8192];
int bytes_read;
while ((bytes_read = unzReadCurrentFile(zip_file_handle, buffer, sizeof(buffer))) > 0) {
fwrite(buffer, bytes_read, 1, out);
}
fclose(out);
unzCloseCurrentFile(zip_file_handle);
if (bytes_read < 0) {
return Status::InternalError("Failed to read file in zip: " +
std::string(filename));
}
}
if ((i + 1) < global_info.number_entry) {
if (unzGoToNextFile(zip_file_handle) != UNZ_OK) {
return Status::InternalError("Failed to go to next file in zip: " + zip_file);
}
}
}
return Status::OK();
}
// entry's lock must be held
Status UserFunctionCache::_download_lib(const std::string& url,
std::shared_ptr<UserFunctionCacheEntry> entry) {
DCHECK(!entry->is_downloaded);
// get local path to save library
std::string tmp_file = entry->lib_file + ".tmp";
auto fp_closer = [](FILE* fp) { fclose(fp); };
std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(tmp_file.c_str(), "w"), fp_closer);
if (fp == nullptr) {
LOG(WARNING) << "fail to open file, file=" << tmp_file;
return Status::InternalError("fail to open file");
}
std::string real_url;
RETURN_IF_ERROR(_get_real_url(url, &real_url));
Md5Digest digest;
HttpClient client;
int64_t file_size = 0;
RETURN_IF_ERROR(client.init(real_url));
Status status;
auto download_cb = [&status, &tmp_file, &fp, &digest, &file_size](const void* data,
size_t length) {
digest.update(data, length);
file_size = file_size + length;
auto res = fwrite(data, length, 1, fp.get());
if (res != 1) {
LOG(WARNING) << "fail to write data to file, file=" << tmp_file
<< ", error=" << ferror(fp.get());
status = Status::InternalError("fail to write data when download");
return false;
}
return true;
};
RETURN_IF_ERROR(client.execute(download_cb));
RETURN_IF_ERROR(status);
digest.digest();
if (!iequal(digest.hex(), entry->checksum)) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,
" The checksum is not equal of {}. The init info of first create entry is:"
"{} But download file check_sum is: {}, file_size is: {}.",
url, entry->debug_string(), digest.hex(), file_size);
std::string error(fmt::to_string(error_msg));
LOG(WARNING) << error;
return Status::InternalError(error);
}
// close this file
fp.reset();
// rename temporary file to library file
auto ret = rename(tmp_file.c_str(), entry->lib_file.c_str());
if (ret != 0) {
char buf[64];
LOG(WARNING) << "fail to rename file from=" << tmp_file << ", to=" << entry->lib_file
<< ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 64);
return Status::InternalError("fail to rename file");
}
// check download
entry->is_downloaded = true;
return Status::OK();
}
std::string UserFunctionCache::_get_file_name_from_url(const std::string& url) const {
std::string file_name;
size_t last_slash_pos = url.find_last_of('/');
if (last_slash_pos != std::string::npos) {
file_name = url.substr(last_slash_pos + 1, url.size());
} else {
file_name = url;
}
return file_name;
}
// entry's lock must be held
Status UserFunctionCache::_load_cache_entry_internal(
std::shared_ptr<UserFunctionCacheEntry> entry) {
RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle));
entry->is_loaded.store(true);
return Status::OK();
}
std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum,
LibType type, const std::string& file_name) {
int shard = function_id % kLibShardNum;
std::stringstream ss;
ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum;
if (type == LibType::JAR) {
ss << '.' << file_name;
} else if (type == LibType::PY_ZIP) {
ss << '.' << file_name;
} else {
ss << ".so";
}
return ss.str();
}
Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url,
const std::string& checksum, std::string* libpath) {
std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::JAR));
*libpath = entry->lib_file;
return Status::OK();
}
Status UserFunctionCache::get_pypath(int64_t fid, const std::string& url,
const std::string& checksum, std::string* libpath) {
std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::PY_ZIP));
*libpath = entry->lib_file;
return Status::OK();
}
std::vector<std::string> UserFunctionCache::_split_string_by_checksum(const std::string& file) {
std::vector<std::string> result;
// Find the first dot from the start
size_t firstDot = file.find('.');
if (firstDot == std::string::npos) return {};
// Find the second dot starting from the first dot's position
size_t secondDot = file.find('.', firstDot + 1);
if (secondDot == std::string::npos) return {};
// Find the last dot from the end
size_t lastDot = file.rfind('.');
if (lastDot == std::string::npos || lastDot <= secondDot) return {};
// Split based on these dots
result.push_back(file.substr(0, firstDot));
result.push_back(file.substr(firstDot + 1, secondDot - firstDot - 1));
result.push_back(file.substr(secondDot + 1, lastDot - secondDot - 1));
result.push_back(file.substr(lastDot + 1));
return result;
}
Status UserFunctionCache::_get_real_url(const std::string& url, std::string* result_url) {
if (url.find(":/") == std::string::npos) {
return _check_and_return_default_java_udf_url(url, result_url);
}
*result_url = url;
return Status::OK();
}
Status UserFunctionCache::_check_and_return_default_java_udf_url(const std::string& url,
std::string* result_url) {
const char* doris_home = std::getenv("DORIS_HOME");
std::string default_url = std::string(doris_home) + "/plugins/java_udf";
std::filesystem::path file = default_url + "/" + url;
// In cloud mode, always try cloud download first (prioritize cloud mode)
if (config::is_cloud_mode()) {
std::string target_path = default_url + "/" + url;
std::string downloaded_path;
Status status = CloudPluginDownloader::download_from_cloud(
CloudPluginDownloader::PluginType::JAVA_UDF, url, target_path, &downloaded_path);
if (status.ok() && !downloaded_path.empty()) {
*result_url = "file://" + downloaded_path;
return Status::OK();
} else {
LOG(WARNING) << "Failed to download Java UDF from cloud: " << status.to_string();
return Status::RuntimeError(
"Cannot download Java UDF from cloud: {}. "
"Please retry later or check your UDF has been uploaded to cloud.",
url);
}
}
// Return the file path regardless of whether it exists (original UDF behavior)
*result_url = "file://" + default_url + "/" + url;
return Status::OK();
}
} // namespace doris