| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "cloud/cloud_warm_up_manager.h" |
| |
| #include <bthread/countdown_event.h> |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <tuple> |
| |
| #include "cloud/cloud_tablet_mgr.h" |
| #include "common/logging.h" |
| #include "io/cache/block_file_cache_downloader.h" |
| #include "olap/rowset/beta_rowset.h" |
| #include "olap/rowset/segment_v2/inverted_index_desc.h" |
| #include "olap/tablet.h" |
| #include "runtime/exec_env.h" |
| #include "util/time.h" |
| |
| namespace doris { |
| |
| CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : _engine(engine) { |
| _download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this); |
| } |
| |
| CloudWarmUpManager::~CloudWarmUpManager() { |
| { |
| std::lock_guard lock(_mtx); |
| _closed = true; |
| } |
| _cond.notify_all(); |
| if (_download_thread.joinable()) { |
| _download_thread.join(); |
| } |
| } |
| |
| std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTablet* tablet) { |
| std::unordered_map<std::string, RowsetMetaSharedPtr> id_to_rowset_meta_map; |
| auto visitor = [&id_to_rowset_meta_map](const RowsetSharedPtr& r) { |
| id_to_rowset_meta_map.emplace(r->rowset_meta()->rowset_id().to_string(), r->rowset_meta()); |
| }; |
| constexpr bool include_stale = false; |
| tablet->traverse_rowsets(visitor, include_stale); |
| return id_to_rowset_meta_map; |
| } |
| |
| void CloudWarmUpManager::handle_jobs() { |
| #ifndef BE_TEST |
| constexpr int WAIT_TIME_SECONDS = 600; |
| while (true) { |
| std::shared_ptr<JobMeta> cur_job = nullptr; |
| { |
| std::unique_lock lock(_mtx); |
| while (!_closed && _pending_job_metas.empty()) { |
| _cond.wait(lock); |
| } |
| if (_closed) break; |
| cur_job = _pending_job_metas.front(); |
| } |
| |
| if (!cur_job) { |
| LOG_WARNING("Warm up job is null"); |
| continue; |
| } |
| for (int64_t tablet_id : cur_job->tablet_ids) { |
| if (_cur_job_id == 0) { // The job is canceled |
| break; |
| } |
| auto res = _engine.tablet_mgr().get_tablet(tablet_id); |
| if (!res.has_value()) { |
| LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(res.error()); |
| continue; |
| } |
| auto tablet = res.value(); |
| auto st = tablet->sync_rowsets(); |
| if (!st) { |
| LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(st); |
| continue; |
| } |
| std::shared_ptr<bthread::CountdownEvent> wait = |
| std::make_shared<bthread::CountdownEvent>(0); |
| auto tablet_meta = tablet->tablet_meta(); |
| auto rs_metas = snapshot_rs_metas(tablet.get()); |
| for (auto& [_, rs] : rs_metas) { |
| for (int64_t seg_id = 0; seg_id < rs->num_segments(); seg_id++) { |
| auto storage_resource = rs->remote_storage_resource(); |
| if (!storage_resource) { |
| LOG(WARNING) << storage_resource.error(); |
| continue; |
| } |
| |
| int64_t expiration_time = |
| tablet_meta->ttl_seconds() == 0 || rs->newest_write_timestamp() <= 0 |
| ? 0 |
| : rs->newest_write_timestamp() + tablet_meta->ttl_seconds(); |
| if (expiration_time <= UnixSeconds()) { |
| expiration_time = 0; |
| } |
| |
| wait->add_count(); |
| // clang-format off |
| _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { |
| .path = storage_resource.value()->remote_segment_path(*rs, seg_id), |
| .file_size = rs->segment_file_size(seg_id), |
| .file_system = storage_resource.value()->fs, |
| .ctx = |
| { |
| .expiration_time = expiration_time, |
| .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, |
| }, |
| .download_done = |
| [wait](Status st) { |
| if (!st) { |
| LOG_WARNING("Warm up error ").error(st); |
| } |
| wait->signal(); |
| }, |
| }); |
| |
| auto download_idx_file = [&](const io::Path& idx_path) { |
| io::DownloadFileMeta meta { |
| .path = idx_path, |
| .file_size = -1, |
| .file_system = storage_resource.value()->fs, |
| .ctx = |
| { |
| .expiration_time = expiration_time, |
| .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, |
| }, |
| .download_done = |
| [wait](Status st) { |
| if (!st) { |
| LOG_WARNING("Warm up error ").error(st); |
| } |
| wait->signal(); |
| }, |
| }; |
| // clang-format on |
| _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); |
| }; |
| auto schema_ptr = rs->tablet_schema(); |
| auto idx_version = schema_ptr->get_inverted_index_storage_format(); |
| if (idx_version == InvertedIndexStorageFormatPB::V1) { |
| for (const auto& index : schema_ptr->inverted_indexes()) { |
| wait->add_count(); |
| auto idx_path = storage_resource.value()->remote_idx_v1_path( |
| *rs, seg_id, index->index_id(), index->get_index_suffix()); |
| download_idx_file(idx_path); |
| } |
| } else { |
| if (schema_ptr->has_inverted_index()) { |
| wait->add_count(); |
| auto idx_path = |
| storage_resource.value()->remote_idx_v2_path(*rs, seg_id); |
| download_idx_file(idx_path); |
| } |
| } |
| } |
| } |
| timespec time; |
| time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS; |
| if (!wait->timed_wait(time)) { |
| LOG_WARNING("Warm up tablet {} take a long time", tablet_meta->tablet_id()); |
| } |
| } |
| { |
| std::unique_lock lock(_mtx); |
| _finish_job.push_back(cur_job); |
| _pending_job_metas.pop_front(); |
| } |
| } |
| #endif |
| } |
| |
| JobMeta::JobMeta(const TJobMeta& meta) |
| : be_ip(meta.be_ip), brpc_port(meta.brpc_port), tablet_ids(meta.tablet_ids) { |
| switch (meta.download_type) { |
| case TDownloadType::BE: |
| download_type = DownloadType::BE; |
| break; |
| case TDownloadType::S3: |
| download_type = DownloadType::S3; |
| break; |
| } |
| } |
| |
| Status CloudWarmUpManager::check_and_set_job_id(int64_t job_id) { |
| std::lock_guard lock(_mtx); |
| if (_cur_job_id == 0) { |
| _cur_job_id = job_id; |
| } |
| Status st = Status::OK(); |
| if (_cur_job_id != job_id) { |
| st = Status::InternalError("The job {} is running", _cur_job_id); |
| } |
| return st; |
| } |
| |
| Status CloudWarmUpManager::check_and_set_batch_id(int64_t job_id, int64_t batch_id, bool* retry) { |
| std::lock_guard lock(_mtx); |
| Status st = Status::OK(); |
| if (_cur_job_id != 0 && _cur_job_id != job_id) { |
| st = Status::InternalError("The job {} is not current job, current job is {}", job_id, |
| _cur_job_id); |
| return st; |
| } |
| if (_cur_job_id == 0) { |
| _cur_job_id = job_id; |
| } |
| if (_cur_batch_id == batch_id) { |
| *retry = true; |
| return st; |
| } |
| if (_pending_job_metas.empty()) { |
| _cur_batch_id = batch_id; |
| } else { |
| st = Status::InternalError("The batch {} is not finish", _cur_batch_id); |
| } |
| return st; |
| } |
| |
| void CloudWarmUpManager::add_job(const std::vector<TJobMeta>& job_metas) { |
| { |
| std::lock_guard lock(_mtx); |
| std::for_each(job_metas.begin(), job_metas.end(), [this](const TJobMeta& meta) { |
| _pending_job_metas.emplace_back(std::make_shared<JobMeta>(meta)); |
| }); |
| } |
| _cond.notify_all(); |
| } |
| |
| #ifdef BE_TEST |
| void CloudWarmUpManager::consumer_job() { |
| { |
| std::unique_lock lock(_mtx); |
| _finish_job.push_back(_pending_job_metas.front()); |
| _pending_job_metas.pop_front(); |
| } |
| } |
| |
| #endif |
| |
| std::tuple<int64_t, int64_t, int64_t, int64_t> CloudWarmUpManager::get_current_job_state() { |
| std::lock_guard lock(_mtx); |
| return std::make_tuple(_cur_job_id, _cur_batch_id, _pending_job_metas.size(), |
| _finish_job.size()); |
| } |
| |
| Status CloudWarmUpManager::clear_job(int64_t job_id) { |
| std::lock_guard lock(_mtx); |
| Status st = Status::OK(); |
| if (job_id == _cur_job_id) { |
| _cur_job_id = 0; |
| _cur_batch_id = -1; |
| _pending_job_metas.clear(); |
| _finish_job.clear(); |
| } else { |
| st = Status::InternalError("The job {} is not current job, current job is {}", job_id, |
| _cur_job_id); |
| } |
| return st; |
| } |
| |
| } // namespace doris |