| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "io/fs/s3_file_system.h" |
| |
| #include <fmt/format.h> |
| |
| #include <cstddef> |
| |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| // IWYU pragma: no_include <bits/chrono.h> |
| #include <aws/core/utils/threading/Executor.h> |
| #include <aws/s3/S3Client.h> |
| |
| #include <chrono> // IWYU pragma: keep |
| #include <filesystem> |
| #include <fstream> // IWYU pragma: keep |
| #include <future> |
| #include <memory> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "io/fs/err_utils.h" |
| #include "io/fs/file_system.h" |
| #include "io/fs/file_writer.h" |
| #include "io/fs/local_file_system.h" |
| #include "io/fs/remote_file_system.h" |
| #include "io/fs/s3_common.h" |
| #include "io/fs/s3_file_reader.h" |
| #include "io/fs/s3_file_writer.h" |
| #include "io/fs/s3_obj_storage_client.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/thread_context.h" |
| #include "util/s3_uri.h" |
| #include "util/s3_util.h" |
| |
| namespace doris::io { |
| namespace { |
| constexpr std::string_view OSS_PRIVATE_ENDPOINT_SUFFIX = "-internal.aliyuncs.com"; |
| constexpr int LEN_OF_OSS_PRIVATE_SUFFIX = 9; // length of "-internal" |
| |
| #ifndef CHECK_S3_CLIENT |
| #define CHECK_S3_CLIENT(client) \ |
| if (!client) { \ |
| return Status::InvalidArgument("init s3 client error"); \ |
| } |
| #endif |
| |
| Result<std::string> get_key(const Path& full_path) { |
| // FIXME(plat1ko): Check bucket in full path and support relative path |
| S3URI uri(full_path.native()); |
| RETURN_IF_ERROR_RESULT(uri.parse()); |
| return uri.get_key(); |
| } |
| |
| } // namespace |
| |
| ObjClientHolder::ObjClientHolder(S3ClientConf conf) : _conf(std::move(conf)) {} |
| |
| ObjClientHolder::~ObjClientHolder() = default; |
| |
| Status ObjClientHolder::init() { |
| _client = S3ClientFactory::instance().create(_conf); |
| if (!_client) { |
| return Status::InvalidArgument("failed to init s3 client with conf {}", _conf.to_string()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status ObjClientHolder::reset(const S3ClientConf& conf) { |
| S3ClientConf reset_conf; |
| { |
| std::shared_lock lock(_mtx); |
| if (conf.get_hash() == _conf.get_hash()) { |
| return Status::OK(); // Same conf |
| } |
| |
| reset_conf = _conf; |
| reset_conf.ak = conf.ak; |
| reset_conf.sk = conf.sk; |
| reset_conf.token = conf.token; |
| reset_conf.bucket = conf.bucket; |
| reset_conf.connect_timeout_ms = conf.connect_timeout_ms; |
| reset_conf.max_connections = conf.max_connections; |
| reset_conf.request_timeout_ms = conf.request_timeout_ms; |
| reset_conf.use_virtual_addressing = conf.use_virtual_addressing; |
| |
| reset_conf.role_arn = conf.role_arn; |
| reset_conf.external_id = conf.external_id; |
| reset_conf.cred_provider_type = conf.cred_provider_type; |
| // Should check endpoint here? |
| } |
| |
| auto client = S3ClientFactory::instance().create(reset_conf); |
| if (!client) { |
| return Status::InvalidArgument("failed to init s3 client with conf {}", conf.to_string()); |
| } |
| |
| LOG(WARNING) << "reset s3 client with new conf: " << conf.to_string(); |
| |
| { |
| std::lock_guard lock(_mtx); |
| _client = std::move(client); |
| _conf = std::move(reset_conf); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Result<int64_t> ObjClientHolder::object_file_size(const std::string& bucket, |
| const std::string& key) const { |
| auto client = get(); |
| if (!client) { |
| return ResultError(Status::InvalidArgument("init s3 client error")); |
| } |
| |
| auto resp = client->head_object({ |
| .bucket = bucket, |
| .key = key, |
| }); |
| |
| if (resp.resp.status.code != ErrorCode::OK) { |
| return ResultError(std::move(Status(resp.resp.status.code, std::move(resp.resp.status.msg)) |
| .append(fmt::format("failed to head s3 file {}", |
| full_s3_path(bucket, key))))); |
| } |
| |
| return resp.file_size; |
| } |
| |
| std::string ObjClientHolder::full_s3_path(std::string_view bucket, std::string_view key) const { |
| return fmt::format("{}/{}/{}", _conf.endpoint, bucket, key); |
| } |
| |
| std::string S3FileSystem::full_s3_path(std::string_view key) const { |
| return _client->full_s3_path(_bucket, key); |
| } |
| |
| Result<std::shared_ptr<S3FileSystem>> S3FileSystem::create(S3Conf s3_conf, std::string id) { |
| std::shared_ptr<S3FileSystem> fs(new S3FileSystem(std::move(s3_conf), std::move(id))); |
| RETURN_IF_ERROR_RESULT(fs->init()); |
| return fs; |
| } |
| |
| S3FileSystem::S3FileSystem(S3Conf s3_conf, std::string id) |
| : RemoteFileSystem(s3_conf.prefix, std::move(id), FileSystemType::S3), |
| _bucket(std::move(s3_conf.bucket)), |
| _prefix(std::move(s3_conf.prefix)), |
| _client(std::make_shared<ObjClientHolder>(std::move(s3_conf.client_conf))) { |
| // FIXME(plat1ko): Normalize prefix |
| // remove the first and last '/' |
| if (!_prefix.empty()) { |
| size_t start = _prefix.find_first_not_of('/'); |
| if (start == std::string::npos) { |
| _prefix = ""; |
| } else { |
| size_t end = _prefix.find_last_not_of('/'); |
| if (start > 0 || end < _prefix.size() - 1) { |
| _prefix = _prefix.substr(start, end - start + 1); |
| } |
| } |
| } |
| } |
| |
| Status S3FileSystem::init() { |
| return _client->init(); |
| } |
| |
| S3FileSystem::~S3FileSystem() = default; |
| |
| Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, |
| const FileWriterOptions* opts) { |
| auto client = _client->get(); |
| CHECK_S3_CLIENT(client); |
| auto key = DORIS_TRY(get_key(file)); |
| *writer = std::make_unique<S3FileWriter>(_client, _bucket, std::move(key), opts); |
| return Status::OK(); |
| } |
| |
| Status S3FileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader, |
| const FileReaderOptions& opts) { |
| auto key = DORIS_TRY(get_key(file)); |
| *reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key, opts.file_size, nullptr)); |
| return Status::OK(); |
| } |
| |
| Status S3FileSystem::create_directory_impl(const Path& dir, bool failed_if_exists) { |
| return Status::OK(); |
| } |
| |
| Status S3FileSystem::delete_file_impl(const Path& file) { |
| auto client = _client->get(); |
| CHECK_S3_CLIENT(client); |
| |
| auto key = DORIS_TRY(get_key(file)); |
| |
| auto resp = client->delete_object({.bucket = _bucket, .key = key}); |
| |
| if (resp.status.code == ErrorCode::OK || resp.status.code == ErrorCode::NOT_FOUND) { |
| return Status::OK(); |
| } |
| return std::move(Status(resp.status.code, std::move(resp.status.msg)) |
| .append(fmt::format("failed to delete file {}", full_s3_path(key)))); |
| } |
| |
| Status S3FileSystem::delete_directory_impl(const Path& dir) { |
| auto client = _client->get(); |
| CHECK_S3_CLIENT(client); |
| |
| auto prefix = DORIS_TRY(get_key(dir)); |
| if (!prefix.empty() && prefix.back() != '/') { |
| prefix.push_back('/'); |
| } |
| |
| auto resp = client->delete_objects_recursively({ |
| .path = full_s3_path(prefix), |
| .bucket = _bucket, |
| .prefix = prefix, |
| }); |
| return {resp.status.code, std::move(resp.status.msg)}; |
| } |
| |
| Status S3FileSystem::batch_delete_impl(const std::vector<Path>& remote_files) { |
| auto client = _client->get(); |
| CHECK_S3_CLIENT(client); |
| |
| // `DeleteObjectsRequest` can only contain 1000 keys at most. |
| constexpr size_t max_delete_batch = 1000; |
| auto path_iter = remote_files.begin(); |
| |
| do { |
| std::vector<std::string> objects; |
| auto path_begin = path_iter; |
| for (; path_iter != remote_files.end() && (path_iter - path_begin < max_delete_batch); |
| ++path_iter) { |
| auto key = DORIS_TRY(get_key(*path_iter)); |
| objects.emplace_back(std::move(key)); |
| } |
| if (objects.empty()) { |
| return Status::OK(); |
| } |
| // clang-format off |
| if (auto resp = client->delete_objects( {.bucket = _bucket,}, std::move(objects)); resp.status.code != ErrorCode::OK) { |
| return {resp.status.code, std::move(resp.status.msg)}; |
| } |
| // clang-format on |
| } while (path_iter != remote_files.end()); |
| |
| return Status::OK(); |
| } |
| |
| Status S3FileSystem::exists_impl(const Path& path, bool* res) const { |
| auto client = _client->get(); |
| CHECK_S3_CLIENT(client); |
| auto key = DORIS_TRY(get_key(path)); |
| |
| VLOG_DEBUG << "key:" << key << " path:" << path; |
| |
| auto resp = client->head_object({.bucket = _bucket, .key = key}); |
| |
| if (resp.resp.status.code == ErrorCode::OK) { |
| *res = true; |
| } else if (resp.resp.status.code == ErrorCode::NOT_FOUND) { |
| *res = false; |
| } else { |
| return std::move( |
| Status(resp.resp.status.code, std::move(resp.resp.status.msg)) |
| .append(fmt::format(" failed to check exists {}", full_s3_path(key)))); |
| } |
| return Status::OK(); |
| } |
| |
| Status S3FileSystem::file_size_impl(const Path& file, int64_t* file_size) const { |
| auto key = DORIS_TRY(get_key(file)); |
| *file_size = DORIS_TRY(_client->object_file_size(_bucket, key)); |
| return Status::OK(); |
| } |
| |
| Status S3FileSystem::list_impl(const Path& dir, bool only_file, std::vector<FileInfo>* files, |
| bool* exists) { |
| // For object storage, this path is always not exist. |
| // So we ignore this property and set exists to true. |
| *exists = true; |
| auto client = _client->get(); |
| CHECK_S3_CLIENT(client); |
| auto prefix = DORIS_TRY(get_key(dir)); |
| if (!prefix.empty() && prefix.back() != '/') { |
| prefix.push_back('/'); |
| } |
| |
| // clang-format off |
| auto resp = client->list_objects( {.bucket = _bucket, .prefix = prefix,}, files); |
| // clang-format on |
| if (resp.status.code == ErrorCode::OK) { |
| for (auto&& file : *files) { |
| file.file_name.erase(0, prefix.size()); |
| } |
| } |
| |
| return {resp.status.code, std::move(resp.status.msg)}; |
| } |
| |
| Status S3FileSystem::rename_impl(const Path& orig_name, const Path& new_name) { |
| return Status::NotSupported("S3FileSystem::rename_impl"); |
| } |
| |
| Status S3FileSystem::upload_impl(const Path& local_file, const Path& remote_file) { |
| auto client = _client->get(); |
| CHECK_S3_CLIENT(client); |
| |
| auto key = DORIS_TRY(get_key(remote_file)); |
| auto start = std::chrono::steady_clock::now(); |
| FileWriterPtr obj_writer; |
| RETURN_IF_ERROR(create_file_impl(key, &obj_writer, nullptr)); |
| FileReaderSPtr local_reader; |
| RETURN_IF_ERROR(io::global_local_filesystem()->open_file(local_file, &local_reader)); |
| size_t local_buffer_size = config::s3_file_system_local_upload_buffer_size; |
| std::unique_ptr<char[]> write_buffer = |
| std::make_unique_for_overwrite<char[]>(local_buffer_size); |
| size_t cur_read = 0; |
| while (cur_read < local_reader->size()) { |
| size_t bytes_read = 0; |
| RETURN_IF_ERROR(local_reader->read_at( |
| cur_read, Slice {write_buffer.get(), local_buffer_size}, &bytes_read)); |
| RETURN_IF_ERROR(obj_writer->append({write_buffer.get(), bytes_read})); |
| cur_read += bytes_read; |
| } |
| RETURN_IF_ERROR(obj_writer->close()); |
| auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start); |
| |
| auto size = local_reader->size(); |
| LOG(INFO) << "Upload " << local_file.native() << " to " << full_s3_path(key) |
| << ", duration=" << duration.count() << ", bytes=" << size; |
| |
| return Status::OK(); |
| } |
| |
| Status S3FileSystem::batch_upload_impl(const std::vector<Path>& local_files, |
| const std::vector<Path>& remote_files) { |
| auto client = _client->get(); |
| CHECK_S3_CLIENT(client); |
| |
| if (local_files.size() != remote_files.size()) { |
| return Status::InvalidArgument("local_files.size({}) != remote_files.size({})", |
| local_files.size(), remote_files.size()); |
| } |
| |
| std::vector<FileWriterPtr> obj_writers(local_files.size()); |
| |
| auto upload_task = [&, this](size_t idx) { |
| SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker()); |
| const auto& local_file = local_files[idx]; |
| const auto& remote_file = remote_files[idx]; |
| auto& obj_writer = obj_writers[idx]; |
| auto key = DORIS_TRY(get_key(remote_file)); |
| LOG(INFO) << "Start to upload " << local_file.native() << " to " << full_s3_path(key); |
| RETURN_IF_ERROR(create_file_impl(key, &obj_writer, nullptr)); |
| FileReaderSPtr local_reader; |
| RETURN_IF_ERROR(io::global_local_filesystem()->open_file(local_file, &local_reader)); |
| size_t local_buffer_size = config::s3_file_system_local_upload_buffer_size; |
| std::unique_ptr<char[]> write_buffer = |
| std::make_unique_for_overwrite<char[]>(local_buffer_size); |
| size_t cur_read = 0; |
| while (cur_read < local_reader->size()) { |
| size_t bytes_read = 0; |
| RETURN_IF_ERROR(local_reader->read_at( |
| cur_read, Slice {write_buffer.get(), local_buffer_size}, &bytes_read)); |
| RETURN_IF_ERROR((*obj_writer).append({write_buffer.get(), bytes_read})); |
| cur_read += bytes_read; |
| } |
| RETURN_IF_ERROR((*obj_writer).close()); |
| return Status::OK(); |
| }; |
| |
| Status s = Status::OK(); |
| std::vector<std::future<Status>> futures; |
| for (int i = 0; i < local_files.size(); ++i) { |
| auto task = std::make_shared<std::packaged_task<Status(size_t idx)>>(upload_task); |
| futures.emplace_back(task->get_future()); |
| auto st = ExecEnv::GetInstance()->s3_file_system_thread_pool()->submit_func( |
| [t = std::move(task), idx = i]() mutable { (*t)(idx); }); |
| // We shouldn't return immediately since the previous submitted tasks might still be running in the thread pool |
| if (!st.ok()) { |
| s = st; |
| break; |
| } |
| } |
| for (auto&& f : futures) { |
| auto cur_s = f.get(); |
| if (!cur_s.ok()) { |
| s = std::move(cur_s); |
| } |
| } |
| return s; |
| } |
| |
| Status S3FileSystem::download_impl(const Path& remote_file, const Path& local_file) { |
| auto client = _client->get(); |
| CHECK_S3_CLIENT(client); |
| auto key = DORIS_TRY(get_key(remote_file)); |
| int64_t size; |
| RETURN_IF_ERROR(file_size(remote_file, &size)); |
| std::unique_ptr<char[]> buf = std::make_unique_for_overwrite<char[]>(size); |
| size_t bytes_read = 0; |
| // clang-format off |
| auto resp = client->get_object( {.bucket = _bucket, .key = key,}, |
| buf.get(), 0, size, &bytes_read); |
| // clang-format on |
| if (resp.status.code != ErrorCode::OK) { |
| return {resp.status.code, std::move(resp.status.msg)}; |
| } |
| Aws::OFStream local_file_s; |
| local_file_s.open(local_file, std::ios::out | std::ios::binary); |
| if (local_file_s.good()) { |
| local_file_s << StringViewStream(buf.get(), size).rdbuf(); |
| } else { |
| return localfs_error(errno, fmt::format("failed to write file {}", local_file.native())); |
| } |
| |
| return Status::OK(); |
| } |
| |
| // oss has public endpoint and private endpoint, is_public_endpoint determines |
| // whether to return a public endpoint. |
| std::string S3FileSystem::generate_presigned_url(const Path& path, int64_t expiration_secs, |
| bool is_public_endpoint) const { |
| std::string key = fmt::format("{}/{}", _prefix, path.native()); |
| std::shared_ptr<ObjStorageClient> client; |
| if (is_public_endpoint && |
| _client->s3_client_conf().endpoint.ends_with(OSS_PRIVATE_ENDPOINT_SUFFIX)) { |
| auto new_s3_conf = _client->s3_client_conf(); |
| new_s3_conf.endpoint.erase( |
| _client->s3_client_conf().endpoint.size() - OSS_PRIVATE_ENDPOINT_SUFFIX.size(), |
| LEN_OF_OSS_PRIVATE_SUFFIX); |
| client = S3ClientFactory::instance().create(new_s3_conf); |
| } else { |
| client = _client->get(); |
| } |
| return client->generate_presigned_url({.bucket = _bucket, .key = key}, expiration_secs, |
| _client->s3_client_conf()); |
| } |
| |
| } // namespace doris::io |