| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "olap/wal/wal_manager.h" |
| |
| #include <absl/strings/str_split.h> |
| #include <bvar/bvar.h> |
| #include <glog/logging.h> |
| |
| #include <chrono> |
| #include <filesystem> |
| #include <shared_mutex> |
| #include <thread> |
| #include <unordered_map> |
| #include <vector> |
| |
| #include "common/config.h" |
| #include "common/status.h" |
| #include "io/fs/local_file_system.h" |
| #include "olap/wal/wal_dirs_info.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/fragment_mgr.h" |
| #include "util/parse_util.h" |
| #include "vec/exec/format/wal/wal_reader.h" |
| |
| namespace doris { |
| |
| bvar::Status<size_t> g_wal_total_count("wal_total_count", 0); |
| |
| WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) |
| : _exec_env(exec_env), |
| _stop(false), |
| _stop_background_threads_latch(1), |
| _first_replay(true) { |
| _wal_dirs = absl::StrSplit(wal_dir_list, ";", absl::SkipWhitespace()); |
| static_cast<void>(ThreadPoolBuilder("GroupCommitReplayWalThreadPool") |
| .set_min_threads(1) |
| .set_max_threads(config::group_commit_relay_wal_threads) |
| .build(&_thread_pool)); |
| _wal_dirs_info = WalDirsInfo::create_unique(); |
| } |
| |
| WalManager::~WalManager() { |
| LOG(INFO) << "WalManager is destoried"; |
| } |
| |
| bool WalManager::is_running() { |
| return !_stop.load(); |
| } |
| |
| void WalManager::stop() { |
| if (!this->_stop.load()) { |
| this->_stop.store(true); |
| _stop_relay_wal(); |
| _stop_background_threads_latch.count_down(); |
| if (_replay_thread) { |
| _replay_thread->join(); |
| } |
| if (_update_wal_dirs_info_thread) { |
| _update_wal_dirs_info_thread->join(); |
| } |
| _thread_pool->shutdown(); |
| LOG(INFO) << "WalManager is stopped"; |
| } |
| } |
| |
| Status WalManager::init() { |
| RETURN_IF_ERROR(_init_wal_dirs_conf()); |
| RETURN_IF_ERROR(_init_wal_dirs()); |
| RETURN_IF_ERROR(_init_wal_dirs_info()); |
| return Thread::create( |
| "WalMgr", "replay_wal", [this]() { static_cast<void>(this->_replay_background()); }, |
| &_replay_thread); |
| } |
| |
| Status WalManager::_init_wal_dirs_conf() { |
| std::vector<std::string> tmp_dirs; |
| if (_wal_dirs.empty()) { |
| // default case. |
| for (const StorePath& path : ExecEnv::GetInstance()->store_paths()) { |
| tmp_dirs.emplace_back(path.path + "/wal"); |
| } |
| } else { |
| // user config must be absolute path. |
| for (const std::string& wal_dir : _wal_dirs) { |
| if (std::filesystem::path(wal_dir).is_absolute()) { |
| tmp_dirs.emplace_back(wal_dir); |
| } else { |
| return Status::InternalError( |
| "BE config group_commit_replay_wal_dir has to be absolute path!"); |
| } |
| } |
| } |
| _wal_dirs = tmp_dirs; |
| return Status::OK(); |
| } |
| |
| Status WalManager::_init_wal_dirs() { |
| bool exists = false; |
| for (auto wal_dir : _wal_dirs) { |
| std::string tmp_dir = wal_dir + "/" + _tmp; |
| LOG(INFO) << "wal_dir:" << wal_dir << ", tmp_dir:" << tmp_dir; |
| RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); |
| if (!exists) { |
| RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir)); |
| } |
| RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_dir, &exists)); |
| if (!exists) { |
| RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status WalManager::_init_wal_dirs_info() { |
| for (const std::string& wal_dir : _wal_dirs) { |
| size_t available_bytes; |
| #ifndef BE_TEST |
| size_t disk_capacity_bytes; |
| RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(wal_dir, &disk_capacity_bytes, |
| &available_bytes)); |
| #else |
| available_bytes = wal_limit_test_bytes; |
| #endif |
| bool is_percent = true; |
| int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, |
| -1, available_bytes, &is_percent); |
| if (wal_disk_limit < 0) { |
| return Status::InternalError( |
| "group_commit_wal_max_disk_limit config is wrong, please check your config!"); |
| } |
| // if there are some wal files in wal dir, we need to add it to wal disk limit. |
| size_t wal_dir_size = 0; |
| #ifndef BE_TEST |
| RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(wal_dir, &wal_dir_size)); |
| #endif |
| if (is_percent) { |
| wal_disk_limit += wal_dir_size; |
| } |
| RETURN_IF_ERROR(_wal_dirs_info->add(wal_dir, wal_disk_limit, wal_dir_size, 0)); |
| |
| #ifdef BE_TEST |
| wal_limit_test_bytes = wal_disk_limit; |
| #endif |
| } |
| #ifndef BE_TEST |
| return Thread::create( |
| "WalMgr", "update_wal_dir_info", |
| [this]() { static_cast<void>(this->_update_wal_dir_info_thread()); }, |
| &_update_wal_dirs_info_thread); |
| #else |
| return Status::OK(); |
| #endif |
| } |
| |
| void WalManager::add_wal_queue(int64_t table_id, int64_t wal_id) { |
| std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock); |
| LOG(INFO) << "add wal to queue, table_id: " << table_id << ", wal_id: " << wal_id; |
| auto it = _wal_queues.find(table_id); |
| if (it == _wal_queues.end()) { |
| std::set<int64_t> tmp_set; |
| tmp_set.insert(wal_id); |
| _wal_queues.emplace(table_id, tmp_set); |
| } else { |
| it->second.insert(wal_id); |
| } |
| } |
| |
| void WalManager::erase_wal_queue(int64_t table_id, int64_t wal_id) { |
| std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock); |
| auto it = _wal_queues.find(table_id); |
| if (it != _wal_queues.end()) { |
| LOG(INFO) << "remove wal from queue, table_id: " << table_id << ", wal_id: " << wal_id; |
| it->second.erase(wal_id); |
| if (it->second.empty()) { |
| _wal_queues.erase(table_id); |
| } |
| } |
| } |
| |
| size_t WalManager::get_wal_queue_size(int64_t table_id) { |
| std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock); |
| size_t count = 0; |
| if (table_id > 0) { |
| auto it = _wal_queues.find(table_id); |
| if (it != _wal_queues.end()) { |
| return it->second.size(); |
| } else { |
| return 0; |
| } |
| } else { |
| // table_id is -1 meaning get all table wal size |
| for (auto& [_, table_wals] : _wal_queues) { |
| count += table_wals.size(); |
| } |
| } |
| return count; |
| } |
| |
| Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, |
| const std::string& label, std::string& base_path, |
| uint32_t wal_version) { |
| base_path = _wal_dirs_info->get_available_random_wal_dir(); |
| std::stringstream ss; |
| ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" |
| << std::to_string(wal_version) << "_" << _exec_env->cluster_info()->backend_id << "_" |
| << std::to_string(wal_id) << "_" << label; |
| { |
| std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock); |
| auto it = _wal_path_map.find(wal_id); |
| if (it != _wal_path_map.end()) { |
| return Status::InternalError("wal_id {} already in wal_path_map", wal_id); |
| } |
| _wal_path_map.emplace(wal_id, ss.str()); |
| } |
| return Status::OK(); |
| } |
| |
| Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) { |
| std::shared_lock rdlock(_wal_path_lock); |
| auto it = _wal_path_map.find(wal_id); |
| if (it != _wal_path_map.end()) { |
| wal_path = _wal_path_map[wal_id]; |
| } else { |
| return Status::InternalError("can not find wal_id {} in wal_path_map", wal_id); |
| } |
| return Status::OK(); |
| } |
| |
| Status WalManager::parse_wal_path(const std::string& file_name, int64_t& version, |
| int64_t& backend_id, int64_t& wal_id, std::string& label) { |
| try { |
| // find version |
| auto pos = file_name.find("_"); |
| version = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10); |
| // find be id |
| auto substring1 = file_name.substr(pos + 1); |
| pos = substring1.find("_"); |
| backend_id = std::strtoll(substring1.substr(0, pos).c_str(), NULL, 10); |
| // find wal id |
| auto substring2 = substring1.substr(pos + 1); |
| pos = substring2.find("_"); |
| wal_id = std::strtoll(substring2.substr(0, pos).c_str(), NULL, 10); |
| // find label |
| label = substring2.substr(pos + 1); |
| VLOG_DEBUG << "version:" << version << "backend_id:" << backend_id << ",wal_id:" << wal_id |
| << ",label:" << label; |
| } catch (const std::invalid_argument& e) { |
| return Status::InvalidArgument("Invalid format, {}", e.what()); |
| } |
| return Status::OK(); |
| } |
| |
| Status WalManager::_load_wals() { |
| std::vector<ScanWalInfo> wals; |
| for (auto wal_dir : _wal_dirs) { |
| WARN_IF_ERROR(_scan_wals(wal_dir, wals), fmt::format("fail to scan wal dir={}", wal_dir)); |
| } |
| for (const auto& wal : wals) { |
| bool exists = false; |
| WARN_IF_ERROR(io::global_local_filesystem()->exists(wal.wal_path, &exists), |
| fmt::format("fail to check exist on wal file={}", wal.wal_path)); |
| if (!exists) { |
| continue; |
| } |
| LOG(INFO) << "find wal: " << wal.wal_path; |
| { |
| std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock); |
| auto it = _wal_path_map.find(wal.wal_id); |
| if (it != _wal_path_map.end()) { |
| LOG(INFO) << "wal_id " << wal.wal_id << " already in wal_path_map, skip it"; |
| continue; |
| } |
| _wal_path_map.emplace(wal.wal_id, wal.wal_path); |
| } |
| // this config is use for test p0 case in pipeline |
| if (config::group_commit_wait_replay_wal_finish) { |
| auto lock = std::make_shared<std::mutex>(); |
| auto cv = std::make_shared<std::condition_variable>(); |
| auto add_st = add_wal_cv_map(wal.wal_id, lock, cv); |
| if (!add_st.ok()) { |
| LOG(WARNING) << "fail to add wal_id " << wal.wal_id << " to wal_cv_map"; |
| continue; |
| } |
| } |
| _exec_env->wal_mgr()->add_wal_queue(wal.tb_id, wal.wal_id); |
| WARN_IF_ERROR(add_recover_wal(wal.db_id, wal.tb_id, wal.wal_id, wal.wal_path), |
| fmt::format("Failed to add recover wal={}", wal.wal_path)); |
| } |
| return Status::OK(); |
| } |
| |
| Status WalManager::_scan_wals(const std::string& wal_path, std::vector<ScanWalInfo>& res) { |
| bool exists = false; |
| auto last_total_size = res.size(); |
| std::vector<io::FileInfo> dbs; |
| Status st = io::global_local_filesystem()->list(wal_path, false, &dbs, &exists); |
| if (!st.ok()) { |
| LOG(WARNING) << "failed list files for wal_dir=" << wal_path << ", st=" << st.to_string(); |
| return st; |
| } |
| for (const auto& database_id : dbs) { |
| if (database_id.is_file || database_id.file_name == _tmp) { |
| continue; |
| } |
| std::vector<io::FileInfo> tables; |
| auto db_path = wal_path + "/" + database_id.file_name; |
| st = io::global_local_filesystem()->list(db_path, false, &tables, &exists); |
| if (!st.ok()) { |
| LOG(WARNING) << "failed list files for wal_dir=" << db_path |
| << ", st=" << st.to_string(); |
| return st; |
| } |
| for (const auto& table_id : tables) { |
| if (table_id.is_file) { |
| continue; |
| } |
| std::vector<io::FileInfo> wals; |
| auto table_path = db_path + "/" + table_id.file_name; |
| st = io::global_local_filesystem()->list(table_path, false, &wals, &exists); |
| if (!st.ok()) { |
| LOG(WARNING) << "failed list files for wal_dir=" << table_path |
| << ", st=" << st.to_string(); |
| return st; |
| } |
| if (wals.empty()) { |
| continue; |
| } |
| int64_t db_id = -1; |
| int64_t tb_id = -1; |
| try { |
| db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10); |
| tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10); |
| } catch (const std::invalid_argument& e) { |
| return Status::InvalidArgument("Invalid format, {}", e.what()); |
| } |
| for (const auto& wal : wals) { |
| int64_t version = -1; |
| int64_t backend_id = -1; |
| int64_t wal_id = -1; |
| std::string label = ""; |
| auto parse_st = parse_wal_path(wal.file_name, version, backend_id, wal_id, label); |
| if (!parse_st.ok()) { |
| LOG(WARNING) << "fail to parse file=" << wal.file_name |
| << ",st=" << parse_st.to_string(); |
| continue; |
| } |
| auto wal_file = table_path + "/" + wal.file_name; |
| struct ScanWalInfo scan_wal_info; |
| scan_wal_info.wal_path = wal_file; |
| scan_wal_info.db_id = db_id; |
| scan_wal_info.tb_id = tb_id; |
| scan_wal_info.wal_id = wal_id; |
| scan_wal_info.be_id = backend_id; |
| res.emplace_back(scan_wal_info); |
| } |
| } |
| } |
| LOG(INFO) << "Finish list wal_dir=" << wal_path |
| << ", wal count=" << std::to_string(res.size() - last_total_size); |
| return Status::OK(); |
| } |
| |
| Status WalManager::_replay_background() { |
| do { |
| if (_stop.load()) { |
| break; |
| } |
| // port == 0 means not received heartbeat yet |
| if (_exec_env->cluster_info() != nullptr && |
| _exec_env->cluster_info()->master_fe_addr.port == 0) { |
| continue; |
| } |
| // replay residual wal,only replay once |
| bool expected = true; |
| if (_first_replay.compare_exchange_strong(expected, false)) { |
| RETURN_IF_ERROR(_load_wals()); |
| } |
| g_wal_total_count.set_value(get_wal_queue_size(-1)); |
| // replay wal of current process |
| std::vector<int64_t> replay_tables; |
| { |
| std::lock_guard<std::shared_mutex> wrlock(_table_lock); |
| auto it = _table_map.begin(); |
| while (it != _table_map.end()) { |
| if (it->second->size() > 0) { |
| replay_tables.push_back(it->first); |
| } |
| it++; |
| } |
| } |
| for (const auto& table_id : replay_tables) { |
| RETURN_IF_ERROR(_thread_pool->submit_func([table_id, this] { |
| auto st = this->_table_map[table_id]->replay_wals(); |
| if (!st.ok()) { |
| LOG(WARNING) << "failed to submit replay wal for table=" << table_id; |
| } |
| })); |
| } |
| } while (!_stop_background_threads_latch.wait_for( |
| std::chrono::seconds(config::group_commit_replay_wal_retry_interval_seconds))); |
| return Status::OK(); |
| } |
| |
| Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, |
| std::string wal) { |
| std::lock_guard<std::shared_mutex> wrlock(_table_lock); |
| std::shared_ptr<WalTable> table_ptr; |
| auto it = _table_map.find(table_id); |
| if (it == _table_map.end()) { |
| table_ptr = std::make_shared<WalTable>(_exec_env, db_id, table_id); |
| _table_map.emplace(table_id, table_ptr); |
| } else { |
| table_ptr = it->second; |
| } |
| table_ptr->add_wal(wal_id, wal); |
| #ifndef BE_TEST |
| WARN_IF_ERROR(update_wal_dir_limit(get_base_wal_path(wal)), |
| "Failed to update wal dir limit while add recover wal!"); |
| WARN_IF_ERROR(update_wal_dir_used(get_base_wal_path(wal)), |
| "Failed to update wal dir used while add recove wal!"); |
| #endif |
| return Status::OK(); |
| } |
| |
| size_t WalManager::get_wal_table_size(int64_t table_id) { |
| std::shared_lock rdlock(_table_lock); |
| auto it = _table_map.find(table_id); |
| if (it != _table_map.end()) { |
| return it->second->size(); |
| } else { |
| return 0; |
| } |
| } |
| |
| void WalManager::_stop_relay_wal() { |
| std::lock_guard<std::shared_mutex> wrlock(_table_lock); |
| for (auto& [_, wal_table] : _table_map) { |
| wal_table->stop(); |
| } |
| } |
| |
| size_t WalManager::get_max_available_size() { |
| return _wal_dirs_info->get_max_available_size(); |
| } |
| |
| std::string WalManager::get_wal_dirs_info_string() { |
| return _wal_dirs_info->get_wal_dirs_info_string(); |
| } |
| |
| Status WalManager::update_wal_dir_limit(const std::string& wal_dir, size_t limit) { |
| return _wal_dirs_info->update_wal_dir_limit(wal_dir, limit); |
| } |
| |
| Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t used) { |
| return _wal_dirs_info->update_wal_dir_used(wal_dir, used); |
| } |
| |
| Status WalManager::update_wal_dir_estimated_wal_bytes(const std::string& wal_dir, |
| size_t increase_estimated_wal_bytes, |
| size_t decrease_estimated_wal_bytes) { |
| return _wal_dirs_info->update_wal_dir_estimated_wal_bytes(wal_dir, increase_estimated_wal_bytes, |
| decrease_estimated_wal_bytes); |
| } |
| |
| Status WalManager::_update_wal_dir_info_thread() { |
| while (!_stop.load()) { |
| if (!ExecEnv::ready()) { |
| LOG(INFO) << "Sleep 1s to wait for storage engine init."; |
| std::this_thread::sleep_for(std::chrono::milliseconds(1000)); |
| continue; |
| } |
| static_cast<void>(_wal_dirs_info->update_all_wal_dir_limit()); |
| static_cast<void>(_wal_dirs_info->update_all_wal_dir_used()); |
| LOG_EVERY_N(INFO, 100) << "Scheduled(every 10s) WAL info: " << get_wal_dirs_info_string(); |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| } |
| return Status::OK(); |
| } |
| |
| Status WalManager::get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes) { |
| return _wal_dirs_info->get_wal_dir_available_size(wal_dir, available_bytes); |
| } |
| |
| std::string WalManager::get_base_wal_path(const std::string& wal_path_str) { |
| io::Path wal_path = wal_path_str; |
| for (int i = 0; i < 3; ++i) { |
| if (!wal_path.has_parent_path()) { |
| return ""; |
| } |
| wal_path = wal_path.parent_path(); |
| } |
| return wal_path.string(); |
| } |
| |
| Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock, |
| std::shared_ptr<std::condition_variable> cv) { |
| std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); |
| auto it = _wal_cv_map.find(wal_id); |
| if (it != _wal_cv_map.end()) { |
| return Status::InternalError("wal {} is already in _wal_cv_map ", wal_id); |
| } |
| auto pair = std::make_pair(lock, cv); |
| _wal_cv_map.emplace(wal_id, pair); |
| LOG(INFO) << "add " << wal_id << " to _wal_cv_map"; |
| return Status::OK(); |
| } |
| |
| Status WalManager::erase_wal_cv_map(int64_t wal_id) { |
| std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); |
| if (_wal_cv_map.erase(wal_id)) { |
| LOG(INFO) << "erase " << wal_id << " from _wal_cv_map"; |
| } else { |
| return Status::InternalError("fail to erase wal {} from wal_cv_map", wal_id); |
| } |
| return Status::OK(); |
| } |
| |
| Status WalManager::wait_replay_wal_finish(int64_t wal_id) { |
| std::shared_ptr<std::mutex> lock = nullptr; |
| std::shared_ptr<std::condition_variable> cv = nullptr; |
| auto st = get_lock_and_cv(wal_id, lock, cv); |
| if (st.ok()) { |
| std::unique_lock l(*(lock)); |
| LOG(INFO) << "start wait " << wal_id; |
| if (cv->wait_for(l, std::chrono::seconds(180)) == std::cv_status::timeout) { |
| LOG(WARNING) << "wait for " << wal_id << " is time out"; |
| } |
| LOG(INFO) << "get wal " << wal_id << ",finish wait"; |
| RETURN_IF_ERROR(erase_wal_cv_map(wal_id)); |
| LOG(INFO) << "erase wal " << wal_id; |
| } |
| return Status::OK(); |
| } |
| |
| Status WalManager::notify_relay_wal(int64_t wal_id) { |
| std::shared_ptr<std::mutex> lock = nullptr; |
| std::shared_ptr<std::condition_variable> cv = nullptr; |
| auto st = get_lock_and_cv(wal_id, lock, cv); |
| if (st.ok()) { |
| std::unique_lock l(*(lock)); |
| cv->notify_all(); |
| LOG(INFO) << "get wal " << wal_id << ",notify all"; |
| } |
| return Status::OK(); |
| } |
| |
| Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr<std::mutex>& lock, |
| std::shared_ptr<std::condition_variable>& cv) { |
| std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); |
| auto it = _wal_cv_map.find(wal_id); |
| if (it == _wal_cv_map.end()) { |
| return Status::InternalError("cannot find txn {} in _wal_cv_map", wal_id); |
| } |
| lock = it->second.first; |
| cv = it->second.second; |
| return Status::OK(); |
| } |
| |
| Status WalManager::delete_wal(int64_t table_id, int64_t wal_id) { |
| std::string wal_path; |
| { |
| std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock); |
| auto it = _wal_path_map.find(wal_id); |
| if (it != _wal_path_map.end()) { |
| wal_path = it->second; |
| auto st = io::global_local_filesystem()->delete_file(wal_path); |
| if (st.ok()) { |
| LOG(INFO) << "delete wal=" << wal_path; |
| } else { |
| LOG(WARNING) << "failed to delete wal=" << wal_path << ", st=" << st.to_string(); |
| } |
| _wal_path_map.erase(wal_id); |
| } |
| } |
| erase_wal_queue(table_id, wal_id); |
| return Status::OK(); |
| } |
| |
| Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t wal_id) { |
| io::Path wal_path = wal; |
| std::list<std::string> path_element; |
| for (int i = 0; i < 3; ++i) { |
| if (!wal_path.has_parent_path()) { |
| return Status::InternalError("parent path is not enough when rename " + wal); |
| } |
| path_element.push_front(wal_path.filename().string()); |
| wal_path = wal_path.parent_path(); |
| } |
| wal_path.append(_tmp); |
| for (auto path : path_element) { |
| wal_path.append(path); |
| } |
| bool exists = false; |
| RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists)); |
| if (!exists) { |
| RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path())); |
| } |
| auto res = std::rename(wal.c_str(), wal_path.string().c_str()); |
| if (res < 0) { |
| LOG(INFO) << "failed to rename wal from " << wal << " to " << wal_path.string(); |
| return Status::InternalError("rename fail on path " + wal); |
| } |
| LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string(); |
| { |
| std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock); |
| auto it = _wal_path_map.find(wal_id); |
| if (it != _wal_path_map.end()) { |
| _wal_path_map.erase(wal_id); |
| } else { |
| LOG(WARNING) << "can't find " << wal_id << " in _wal_path_map when trying to rename"; |
| } |
| } |
| erase_wal_queue(table_id, wal_id); |
| return Status::OK(); |
| } |
| |
| } // namespace doris |