blob: 6b24dc1efc1e18e4ac7622176a659302cc267c9d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/plugin/cloud_plugin_downloader.h"
#include <fmt/format.h>
#include "cloud/cloud_storage_engine.h"
#include "io/fs/local_file_system.h"
#include "io/fs/remote_file_system.h"
#include "runtime/exec_env.h"
namespace doris {
// Use 10MB buffer for all downloads - same as cloud_warm_up_manager
static constexpr size_t DOWNLOAD_BUFFER_SIZE = 10 * 1024 * 1024; // 10MB
// Static mutex definition for synchronizing downloads
std::mutex CloudPluginDownloader::_download_mutex;
Status CloudPluginDownloader::download_from_cloud(PluginType type, const std::string& name,
const std::string& local_path,
std::string* result_path) {
// Use lock_guard to synchronize concurrent downloads
std::lock_guard<std::mutex> lock(_download_mutex);
if (name.empty()) {
return Status::InvalidArgument("Plugin name cannot be empty");
}
CloudPluginDownloader downloader;
// 1. Get FileSystem
io::RemoteFileSystemSPtr filesystem;
RETURN_IF_ERROR(downloader._get_cloud_filesystem(&filesystem));
// 2. Build remote plugin path
std::string remote_path;
RETURN_IF_ERROR(downloader._build_plugin_path(type, name, &remote_path));
LOG(INFO) << "Downloading plugin: " << name << " -> " << local_path;
// 3. Prepare local environment
RETURN_IF_ERROR(downloader._prepare_local_path(local_path));
// 4. Download remote file to local
RETURN_IF_ERROR(downloader._download_remote_file(filesystem, remote_path, local_path));
*result_path = local_path;
LOG(INFO) << "Successfully downloaded plugin: " << name << " to " << local_path;
return Status::OK();
}
Status CloudPluginDownloader::_build_plugin_path(PluginType type, const std::string& name,
std::string* path) {
std::string type_name;
switch (type) {
case PluginType::JDBC_DRIVERS:
type_name = "jdbc_drivers";
break;
case PluginType::JAVA_UDF:
type_name = "java_udf";
break;
default:
return Status::InvalidArgument("Unsupported plugin type: {}", static_cast<int>(type));
}
*path = fmt::format("plugins/{}/{}", type_name, name);
return Status::OK();
}
Status CloudPluginDownloader::_get_cloud_filesystem(io::RemoteFileSystemSPtr* filesystem) {
BaseStorageEngine& base_engine = ExecEnv::GetInstance()->storage_engine();
CloudStorageEngine* cloud_engine = dynamic_cast<CloudStorageEngine*>(&base_engine);
if (!cloud_engine) {
return Status::NotFound("CloudStorageEngine not found, not in cloud mode");
}
*filesystem = cloud_engine->latest_fs();
if (!*filesystem) {
return Status::NotFound("No latest filesystem available in cloud mode");
}
return Status::OK();
}
Status CloudPluginDownloader::_prepare_local_path(const std::string& local_path) {
// Remove existing file if present
bool exists = false;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(local_path, &exists));
if (exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_path));
LOG(INFO) << "Removed existing file: " << local_path;
}
// Ensure local directory exists
std::string dir_path = local_path.substr(0, local_path.find_last_of('/'));
if (!dir_path.empty()) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir_path));
}
return Status::OK();
}
Status CloudPluginDownloader::_download_remote_file(io::RemoteFileSystemSPtr filesystem,
const std::string& remote_path,
const std::string& local_path) {
// Open remote file for reading
io::FileReaderSPtr remote_reader;
io::FileReaderOptions opts;
RETURN_IF_ERROR(filesystem->open_file(remote_path, &remote_reader, &opts));
// Get file size
int64_t file_size;
RETURN_IF_ERROR(filesystem->file_size(remote_path, &file_size));
// Create local file writer
io::FileWriterPtr local_writer;
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(local_path, &local_writer));
auto buffer = std::make_unique<char[]>(DOWNLOAD_BUFFER_SIZE);
size_t total_read = 0;
while (total_read < file_size) {
size_t to_read =
std::min(DOWNLOAD_BUFFER_SIZE, static_cast<size_t>(file_size - total_read));
size_t bytes_read;
RETURN_IF_ERROR(remote_reader->read_at(total_read, {buffer.get(), to_read}, &bytes_read));
RETURN_IF_ERROR(local_writer->append({buffer.get(), bytes_read}));
total_read += bytes_read;
}
RETURN_IF_ERROR(local_writer->close());
return Status::OK();
}
} // namespace doris