| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "io/fs/broker_file_system.h" |
| |
| #include <fmt/format.h> |
| #include <gen_cpp/PaloBrokerService_types.h> |
| #include <gen_cpp/TPaloBrokerService.h> |
| #include <gen_cpp/Types_types.h> |
| #include <glog/logging.h> |
| #include <thrift/Thrift.h> |
| #include <thrift/transport/TTransportException.h> |
| |
| #include <cstddef> |
| // IWYU pragma: no_include <bits/chrono.h> |
| #include <chrono> // IWYU pragma: keep |
| #include <filesystem> |
| #include <ostream> |
| #include <thread> |
| #include <utility> |
| |
| #include "common/config.h" |
| #include "common/status.h" |
| #include "io/fs/broker_file_reader.h" |
| #include "io/fs/broker_file_writer.h" |
| #include "io/fs/file_reader.h" |
| #include "io/fs/file_system.h" |
| #include "io/fs/file_writer.h" |
| #include "io/fs/local_file_system.h" |
| #include "runtime/broker_mgr.h" |
| #include "runtime/exec_env.h" |
| #include "util/slice.h" |
| #include "vec/common/custom_allocator.h" |
| |
| namespace doris::io { |
| |
| #ifdef BE_TEST |
| inline BrokerServiceClientCache* client_cache() { |
| static BrokerServiceClientCache s_client_cache; |
| return &s_client_cache; |
| } |
| |
| inline const std::string& client_id(const TNetworkAddress& addr) { |
| static std::string s_client_id = "doris_unit_test"; |
| return s_client_id; |
| } |
| #else |
| inline BrokerServiceClientCache* client_cache() { |
| return ExecEnv::GetInstance()->broker_client_cache(); |
| } |
| |
| inline const std::string& client_id(const TNetworkAddress& addr) { |
| return ExecEnv::GetInstance()->broker_mgr()->get_client_id(addr); |
| } |
| #endif |
| |
| #ifndef CHECK_BROKER_CLIENT |
| #define CHECK_BROKER_CLIENT(client) \ |
| if (!client || !client->is_alive()) { \ |
| return Status::InternalError("connect to broker failed"); \ |
| } |
| #endif |
| |
| Result<std::shared_ptr<BrokerFileSystem>> BrokerFileSystem::create( |
| const TNetworkAddress& broker_addr, const std::map<std::string, std::string>& broker_prop, |
| std::string id) { |
| std::shared_ptr<BrokerFileSystem> fs( |
| new BrokerFileSystem(broker_addr, broker_prop, std::move(id))); |
| RETURN_IF_ERROR_RESULT(fs->init()); |
| return fs; |
| } |
| |
| BrokerFileSystem::BrokerFileSystem(const TNetworkAddress& broker_addr, |
| const std::map<std::string, std::string>& broker_prop, |
| std::string id) |
| : RemoteFileSystem("", std::move(id), FileSystemType::BROKER), |
| _broker_addr(broker_addr), |
| _broker_prop(broker_prop) {} |
| |
| Status BrokerFileSystem::init() { |
| Status status = Status::OK(); |
| _connection = std::make_shared<BrokerServiceConnection>(client_cache(), _broker_addr, |
| config::thrift_rpc_timeout_ms, &status); |
| return status; |
| } |
| |
| Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer, |
| const FileWriterOptions* opts) { |
| *writer = DORIS_TRY( |
| BrokerFileWriter::create(ExecEnv::GetInstance(), _broker_addr, _broker_prop, path)); |
| return Status::OK(); |
| } |
| |
| Status BrokerFileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader, |
| const FileReaderOptions& opts) { |
| int64_t fsize = opts.file_size; |
| if (fsize <= 0) { |
| RETURN_IF_ERROR(file_size_impl(file, &fsize)); |
| } |
| |
| CHECK_BROKER_CLIENT(_connection); |
| TBrokerOpenReaderRequest request; |
| request.__set_version(TBrokerVersion::VERSION_ONE); |
| request.__set_path(file); |
| request.__set_startOffset(0); |
| request.__set_clientId(client_id(_broker_addr)); |
| request.__set_properties(_broker_prop); |
| |
| std::unique_ptr<TBrokerOpenReaderResponse> response(new TBrokerOpenReaderResponse()); |
| try { |
| Status status; |
| try { |
| (*_connection)->openReader(*response, request); |
| } catch (apache::thrift::transport::TTransportException&) { |
| std::this_thread::sleep_for(std::chrono::seconds(1)); |
| RETURN_IF_ERROR(_connection->reopen()); |
| (*_connection)->openReader(*response, request); |
| } |
| } catch (apache::thrift::TException& e) { |
| return Status::RpcError("failed to open file {}: {}", file.native(), error_msg(e.what())); |
| } |
| |
| if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) { |
| return Status::IOError("failed to open file {}: {}", file.native(), |
| error_msg(response->opStatus.message)); |
| } |
| *reader = std::make_shared<BrokerFileReader>(_broker_addr, file, fsize, response->fd, |
| _connection); |
| return Status::OK(); |
| } |
| |
| Status BrokerFileSystem::create_directory_impl(const Path& /*path*/, bool /*failed_if_exists*/) { |
| return Status::NotSupported("create directory not implemented!"); |
| } |
| |
| Status BrokerFileSystem::delete_file_impl(const Path& file) { |
| CHECK_BROKER_CLIENT(_connection); |
| try { |
| // rm file from remote path |
| TBrokerDeletePathRequest del_req; |
| TBrokerOperationStatus del_rep; |
| del_req.__set_version(TBrokerVersion::VERSION_ONE); |
| del_req.__set_path(file); |
| del_req.__set_properties(_broker_prop); |
| |
| try { |
| (*_connection)->deletePath(del_rep, del_req); |
| } catch (apache::thrift::transport::TTransportException&) { |
| RETURN_IF_ERROR((*_connection).reopen()); |
| (*_connection)->deletePath(del_rep, del_req); |
| } |
| |
| if (del_rep.statusCode == TBrokerOperationStatusCode::OK) { |
| return Status::OK(); |
| } else { |
| return Status::IOError("failed to delete file {}: {}", file.native(), |
| error_msg(del_rep.message)); |
| } |
| } catch (apache::thrift::TException& e) { |
| return Status::RpcError("failed to delete file {}: {}", file.native(), error_msg(e.what())); |
| } |
| } |
| |
| // Delete all files under path. |
| Status BrokerFileSystem::delete_directory_impl(const Path& dir) { |
| return delete_file_impl(dir); |
| } |
| |
| Status BrokerFileSystem::batch_delete_impl(const std::vector<Path>& files) { |
| for (auto& file : files) { |
| RETURN_IF_ERROR(delete_file_impl(file)); |
| } |
| return Status::OK(); |
| } |
| |
| Status BrokerFileSystem::exists_impl(const Path& path, bool* res) const { |
| CHECK_BROKER_CLIENT(_connection); |
| *res = false; |
| try { |
| TBrokerCheckPathExistRequest check_req; |
| TBrokerCheckPathExistResponse check_rep; |
| check_req.__set_version(TBrokerVersion::VERSION_ONE); |
| check_req.__set_path(path); |
| check_req.__set_properties(_broker_prop); |
| |
| try { |
| (*_connection)->checkPathExist(check_rep, check_req); |
| } catch (apache::thrift::transport::TTransportException&) { |
| RETURN_IF_ERROR((*_connection).reopen()); |
| (*_connection)->checkPathExist(check_rep, check_req); |
| } |
| |
| if (check_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) { |
| return Status::IOError("failed to check exist of path {}: {}", path.native(), |
| error_msg(check_rep.opStatus.message)); |
| } else if (!check_rep.isPathExist) { |
| *res = false; |
| return Status::OK(); |
| } else { |
| *res = true; |
| return Status::OK(); |
| } |
| } catch (apache::thrift::TException& e) { |
| return Status::RpcError("failed to check exist of path {}: {}", path.native(), |
| error_msg(e.what())); |
| } |
| } |
| |
| Status BrokerFileSystem::file_size_impl(const Path& path, int64_t* file_size) const { |
| CHECK_BROKER_CLIENT(_connection); |
| try { |
| TBrokerFileSizeRequest req; |
| req.__set_version(TBrokerVersion::VERSION_ONE); |
| req.__set_path(path); |
| req.__set_properties(_broker_prop); |
| |
| TBrokerFileSizeResponse resp; |
| try { |
| (*_connection)->fileSize(resp, req); |
| } catch (apache::thrift::transport::TTransportException&) { |
| RETURN_IF_ERROR((*_connection).reopen()); |
| (*_connection)->fileSize(resp, req); |
| } |
| |
| if (resp.opStatus.statusCode != TBrokerOperationStatusCode::OK) { |
| return Status::IOError("failed to get file size of path {}: {}", path.native(), |
| error_msg(resp.opStatus.message)); |
| } |
| if (resp.fileSize < 0) { |
| return Status::IOError("failed to get file size of path {}: size is negtive: {}", |
| path.native(), resp.fileSize); |
| } |
| *file_size = resp.fileSize; |
| return Status::OK(); |
| } catch (apache::thrift::TException& e) { |
| return Status::RpcError("failed to get file size of path {}: {}", path.native(), |
| error_msg(e.what())); |
| } |
| } |
| |
| Status BrokerFileSystem::list_impl(const Path& dir, bool only_file, std::vector<FileInfo>* files, |
| bool* exists) { |
| RETURN_IF_ERROR(exists_impl(dir, exists)); |
| if (!(*exists)) { |
| return Status::OK(); |
| } |
| CHECK_BROKER_CLIENT(_connection); |
| Status status = Status::OK(); |
| try { |
| // get existing files from remote path |
| TBrokerListResponse list_rep; |
| TBrokerListPathRequest list_req; |
| list_req.__set_version(TBrokerVersion::VERSION_ONE); |
| list_req.__set_path(dir / "*"); |
| list_req.__set_isRecursive(false); |
| list_req.__set_properties(_broker_prop); |
| list_req.__set_fileNameOnly(true); // we only need file name, not abs path |
| |
| try { |
| (*_connection)->listPath(list_rep, list_req); |
| } catch (apache::thrift::transport::TTransportException&) { |
| RETURN_IF_ERROR((*_connection).reopen()); |
| (*_connection)->listPath(list_rep, list_req); |
| } |
| |
| if (list_rep.opStatus.statusCode == TBrokerOperationStatusCode::FILE_NOT_FOUND) { |
| LOG(INFO) << "path does not exist: " << dir; |
| *exists = false; |
| return Status::OK(); |
| } else if (list_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) { |
| return Status::IOError("failed to list dir {}: {}", dir.native(), |
| error_msg(list_rep.opStatus.message)); |
| } |
| LOG(INFO) << "finished to list files from remote path. file num: " << list_rep.files.size(); |
| *exists = true; |
| |
| // split file name and checksum |
| for (const auto& file : list_rep.files) { |
| if (only_file && file.isDir) { |
| // this is not a file |
| continue; |
| } |
| FileInfo file_info; |
| file_info.file_name = file.path; |
| file_info.file_size = file.size; |
| file_info.is_file = !file.isDir; |
| files->emplace_back(std::move(file_info)); |
| } |
| |
| LOG(INFO) << "finished to split files. valid file num: " << files->size(); |
| } catch (apache::thrift::TException& e) { |
| std::stringstream ss; |
| ss << "failed to list files in remote path: " << dir << ", msg: " << e.what(); |
| return Status::RpcError("failed to list dir {}: {}", dir.native(), error_msg(e.what())); |
| } |
| return status; |
| } |
| |
| Status BrokerFileSystem::rename_impl(const Path& orig_name, const Path& new_name) { |
| CHECK_BROKER_CLIENT(_connection); |
| try { |
| TBrokerOperationStatus op_status; |
| TBrokerRenamePathRequest rename_req; |
| rename_req.__set_version(TBrokerVersion::VERSION_ONE); |
| rename_req.__set_srcPath(orig_name); |
| rename_req.__set_destPath(new_name); |
| rename_req.__set_properties(_broker_prop); |
| |
| try { |
| (*_connection)->renamePath(op_status, rename_req); |
| } catch (apache::thrift::transport::TTransportException&) { |
| RETURN_IF_ERROR((*_connection).reopen()); |
| (*_connection)->renamePath(op_status, rename_req); |
| } |
| |
| if (op_status.statusCode != TBrokerOperationStatusCode::OK) { |
| return Status::IOError("failed to rename from {} to {}: {}", orig_name.native(), |
| new_name.native(), error_msg(op_status.message)); |
| } |
| } catch (apache::thrift::TException& e) { |
| return Status::RpcError("failed to rename from {} to {}: {}", orig_name.native(), |
| new_name.native(), error_msg(e.what())); |
| } |
| |
| LOG(INFO) << "finished to rename file. orig: " << orig_name << ", new: " << new_name; |
| return Status::OK(); |
| } |
| |
| Status BrokerFileSystem::upload_impl(const Path& local_file, const Path& remote_file) { |
| // 1. open local file for read |
| FileSystemSPtr local_fs = global_local_filesystem(); |
| FileReaderSPtr local_reader = nullptr; |
| RETURN_IF_ERROR(local_fs->open_file(local_file, &local_reader)); |
| |
| int64_t file_len = local_reader->size(); |
| if (file_len == -1) { |
| return Status::IOError("failed to get length of file: {}: {}", local_file.native(), |
| error_msg("")); |
| } |
| |
| // NOTICE: broker writer must be closed before calling rename |
| FileWriterPtr broker_writer = nullptr; |
| RETURN_IF_ERROR(create_file_impl(remote_file, &broker_writer, nullptr)); |
| |
| constexpr size_t buf_sz = 1024 * 1024; |
| char read_buf[buf_sz]; |
| size_t left_len = file_len; |
| size_t read_offset = 0; |
| size_t bytes_read = 0; |
| while (left_len > 0) { |
| size_t read_len = left_len > buf_sz ? buf_sz : left_len; |
| RETURN_IF_ERROR(local_reader->read_at(read_offset, {read_buf, read_len}, &bytes_read)); |
| // write through broker |
| RETURN_IF_ERROR(broker_writer->append({read_buf, read_len})); |
| |
| read_offset += read_len; |
| left_len -= read_len; |
| } |
| |
| // close manually, because we need to check its close status |
| RETURN_IF_ERROR(broker_writer->close()); |
| LOG(INFO) << "finished to write file via broker. file: " << local_file |
| << ", length: " << file_len; |
| return Status::OK(); |
| } |
| |
| Status BrokerFileSystem::batch_upload_impl(const std::vector<Path>& local_files, |
| const std::vector<Path>& remote_files) { |
| DCHECK(local_files.size() == remote_files.size()); |
| for (int i = 0; i < local_files.size(); ++i) { |
| RETURN_IF_ERROR(upload_impl(local_files[i], remote_files[i])); |
| } |
| return Status::OK(); |
| } |
| |
| Status BrokerFileSystem::download_impl(const Path& remote_file, const Path& local_file) { |
| // 1. open remote file for read |
| FileReaderSPtr broker_reader = nullptr; |
| RETURN_IF_ERROR(open_file_internal(remote_file, &broker_reader, FileReaderOptions::DEFAULT)); |
| |
| // 2. remove the existing local file if exist |
| if (std::filesystem::remove(local_file)) { |
| VLOG(2) << "remove the previously exist local file: " << local_file; |
| } |
| |
| // 3. open local file for write |
| FileSystemSPtr local_fs = global_local_filesystem(); |
| FileWriterPtr local_writer = nullptr; |
| RETURN_IF_ERROR(local_fs->create_file(local_file, &local_writer)); |
| |
| // 4. read remote and write to local |
| VLOG(2) << "read remote file: " << remote_file << " to local: " << local_file; |
| constexpr size_t buf_sz = 1024 * 1024; |
| auto read_buf = make_unique_buffer<uint8_t>(buf_sz); |
| size_t cur_offset = 0; |
| while (true) { |
| size_t read_len = 0; |
| Slice file_slice(read_buf.get(), buf_sz); |
| RETURN_IF_ERROR(broker_reader->read_at(cur_offset, file_slice, &read_len)); |
| cur_offset += read_len; |
| if (read_len == 0) { |
| break; |
| } |
| |
| RETURN_IF_ERROR(local_writer->append({read_buf.get(), read_len})); |
| } // file_handler should be closed before calculating checksum |
| |
| return local_writer->close(); |
| } |
| |
| std::string BrokerFileSystem::error_msg(const std::string& err) const { |
| return fmt::format("({}:{}), {}", _broker_addr.hostname, _broker_addr.port, err); |
| } |
| |
| } // namespace doris::io |