| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <gen_cpp/Types_types.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <glog/logging.h> |
| #include <rapidjson/prettywriter.h> |
| #include <rapidjson/stringbuffer.h> |
| #include <stdint.h> |
| #include <sys/types.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| // IWYU pragma: no_include <bits/chrono.h> |
| #include <chrono> // IWYU pragma: keep |
| #include <cmath> |
| #include <condition_variable> |
| #include <cstdint> |
| #include <ctime> |
| #include <functional> |
| #include <map> |
| #include <memory> |
| #include <mutex> |
| #include <ostream> |
| #include <random> |
| #include <shared_mutex> |
| #include <string> |
| #include <thread> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include "agent/utils.h" |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "cpp/sync_point.h" |
| #include "gen_cpp/FrontendService.h" |
| #include "gen_cpp/internal_service.pb.h" |
| #include "io/fs/file_writer.h" // IWYU pragma: keep |
| #include "io/fs/path.h" |
| #include "olap/base_tablet.h" |
| #include "olap/cold_data_compaction.h" |
| #include "olap/compaction_permit_limiter.h" |
| #include "olap/cumulative_compaction.h" |
| #include "olap/cumulative_compaction_policy.h" |
| #include "olap/cumulative_compaction_time_series_policy.h" |
| #include "olap/data_dir.h" |
| #include "olap/olap_common.h" |
| #include "olap/olap_define.h" |
| #include "olap/rowset/segcompaction.h" |
| #include "olap/schema_change.h" |
| #include "olap/single_replica_compaction.h" |
| #include "olap/storage_engine.h" |
| #include "olap/storage_policy.h" |
| #include "olap/tablet.h" |
| #include "olap/tablet_manager.h" |
| #include "olap/tablet_meta.h" |
| #include "olap/tablet_meta_manager.h" |
| #include "olap/tablet_schema.h" |
| #include "olap/task/engine_publish_version_task.h" |
| #include "olap/task/index_builder.h" |
| #include "runtime/client_cache.h" |
| #include "runtime/memory/cache_manager.h" |
| #include "runtime/memory/global_memory_arbitrator.h" |
| #include "util/countdown_latch.h" |
| #include "util/debug_points.h" |
| #include "util/doris_metrics.h" |
| #include "util/mem_info.h" |
| #include "util/metrics.h" |
| #include "util/thread.h" |
| #include "util/threadpool.h" |
| #include "util/thrift_rpc_helper.h" |
| #include "util/time.h" |
| #include "util/uid_util.h" |
| #include "util/work_thread_pool.hpp" |
| |
| using std::string; |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| using io::Path; |
| |
| // number of running SCHEMA-CHANGE threads |
| volatile uint32_t g_schema_change_active_threads = 0; |
| bvar::Status<int64_t> g_cumu_compaction_task_num_per_round("cumu_compaction_task_num_per_round", 0); |
| bvar::Status<int64_t> g_base_compaction_task_num_per_round("base_compaction_task_num_per_round", 0); |
| |
| static const uint64_t DEFAULT_SEED = 104729; |
| static const uint64_t MOD_PRIME = 7652413; |
| |
| CompactionSubmitRegistry::CompactionSubmitRegistry(CompactionSubmitRegistry&& r) { |
| std::swap(_tablet_submitted_cumu_compaction, r._tablet_submitted_cumu_compaction); |
| std::swap(_tablet_submitted_base_compaction, r._tablet_submitted_base_compaction); |
| std::swap(_tablet_submitted_full_compaction, r._tablet_submitted_full_compaction); |
| } |
| |
| CompactionSubmitRegistry CompactionSubmitRegistry::create_snapshot() { |
| // full compaction is not engaged in this method |
| std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex); |
| CompactionSubmitRegistry registry; |
| registry._tablet_submitted_base_compaction = _tablet_submitted_base_compaction; |
| registry._tablet_submitted_cumu_compaction = _tablet_submitted_cumu_compaction; |
| return registry; |
| } |
| |
| void CompactionSubmitRegistry::reset(const std::vector<DataDir*>& stores) { |
| // full compaction is not engaged in this method |
| for (const auto& store : stores) { |
| _tablet_submitted_cumu_compaction[store] = {}; |
| _tablet_submitted_base_compaction[store] = {}; |
| } |
| } |
| |
| uint32_t CompactionSubmitRegistry::count_executing_compaction(DataDir* dir, |
| CompactionType compaction_type) { |
| // non-lock, used in snapshot |
| const auto& compaction_tasks = _get_tablet_set(dir, compaction_type); |
| return cast_set<uint32_t>(std::count_if( |
| compaction_tasks.begin(), compaction_tasks.end(), |
| [](const auto& task) { return task->compaction_stage == CompactionStage::EXECUTING; })); |
| } |
| |
| uint32_t CompactionSubmitRegistry::count_executing_cumu_and_base(DataDir* dir) { |
| // non-lock, used in snapshot |
| return count_executing_compaction(dir, CompactionType::BASE_COMPACTION) + |
| count_executing_compaction(dir, CompactionType::CUMULATIVE_COMPACTION); |
| } |
| |
| bool CompactionSubmitRegistry::has_compaction_task(DataDir* dir, CompactionType compaction_type) { |
| // non-lock, used in snapshot |
| return !_get_tablet_set(dir, compaction_type).empty(); |
| } |
| |
| std::vector<TabletSharedPtr> CompactionSubmitRegistry::pick_topn_tablets_for_compaction( |
| TabletManager* tablet_mgr, DataDir* data_dir, CompactionType compaction_type, |
| const CumuCompactionPolicyTable& cumu_compaction_policies, uint32_t* disk_max_score) { |
| // non-lock, used in snapshot |
| return tablet_mgr->find_best_tablets_to_compaction(compaction_type, data_dir, |
| _get_tablet_set(data_dir, compaction_type), |
| disk_max_score, cumu_compaction_policies); |
| } |
| |
| bool CompactionSubmitRegistry::insert(TabletSharedPtr tablet, CompactionType compaction_type) { |
| std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex); |
| auto& tablet_set = _get_tablet_set(tablet->data_dir(), compaction_type); |
| bool already_exist = !(tablet_set.insert(tablet).second); |
| return already_exist; |
| } |
| |
| void CompactionSubmitRegistry::remove(TabletSharedPtr tablet, CompactionType compaction_type, |
| std::function<void()> wakeup_cb) { |
| std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex); |
| auto& tablet_set = _get_tablet_set(tablet->data_dir(), compaction_type); |
| size_t removed = tablet_set.erase(tablet); |
| if (removed == 1) { |
| wakeup_cb(); |
| } |
| } |
| |
| CompactionSubmitRegistry::TabletSet& CompactionSubmitRegistry::_get_tablet_set( |
| DataDir* dir, CompactionType compaction_type) { |
| switch (compaction_type) { |
| case CompactionType::BASE_COMPACTION: |
| return _tablet_submitted_base_compaction[dir]; |
| case CompactionType::CUMULATIVE_COMPACTION: |
| return _tablet_submitted_cumu_compaction[dir]; |
| case CompactionType::FULL_COMPACTION: |
| return _tablet_submitted_full_compaction[dir]; |
| default: |
| CHECK(false) << "invalid compaction type"; |
| } |
| } |
| |
| static int32_t get_cumu_compaction_threads_num(size_t data_dirs_num) { |
| int32_t threads_num = config::max_cumu_compaction_threads; |
| if (threads_num == -1) { |
| int32_t num_cores = doris::CpuInfo::num_cores(); |
| threads_num = std::max(cast_set<int32_t>(data_dirs_num), num_cores / 6); |
| } |
| threads_num = threads_num <= 0 ? 1 : threads_num; |
| return threads_num; |
| } |
| |
| static int32_t get_base_compaction_threads_num(size_t data_dirs_num) { |
| int32_t threads_num = config::max_base_compaction_threads; |
| if (threads_num == -1) { |
| threads_num = cast_set<int32_t>(data_dirs_num); |
| } |
| threads_num = threads_num <= 0 ? 1 : threads_num; |
| return threads_num; |
| } |
| |
| static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) { |
| int32_t threads_num = config::max_single_replica_compaction_threads; |
| if (threads_num == -1) { |
| threads_num = cast_set<int32_t>(data_dirs_num); |
| } |
| threads_num = threads_num <= 0 ? 1 : threads_num; |
| return threads_num; |
| } |
| |
| Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) { |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "unused_rowset_monitor_thread", |
| [this]() { this->_unused_rowset_monitor_thread_callback(); }, |
| &_unused_rowset_monitor_thread)); |
| LOG(INFO) << "unused rowset monitor thread started"; |
| |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "evict_querying_rowset_thread", |
| [this]() { this->_evict_quring_rowset_thread_callback(); }, |
| &_evict_quering_rowset_thread)); |
| LOG(INFO) << "evict quering thread started"; |
| |
| // start thread for monitoring the snapshot and trash folder |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "garbage_sweeper_thread", |
| [this]() { this->_garbage_sweeper_thread_callback(); }, &_garbage_sweeper_thread)); |
| LOG(INFO) << "garbage sweeper thread started"; |
| |
| // start thread for monitoring the tablet with io error |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "disk_stat_monitor_thread", |
| [this]() { this->_disk_stat_monitor_thread_callback(); }, &_disk_stat_monitor_thread)); |
| LOG(INFO) << "disk stat monitor thread started"; |
| |
| // convert store map to vector |
| std::vector<DataDir*> data_dirs = get_stores(); |
| |
| auto base_compaction_threads = get_base_compaction_threads_num(data_dirs.size()); |
| auto cumu_compaction_threads = get_cumu_compaction_threads_num(data_dirs.size()); |
| auto single_replica_compaction_threads = |
| get_single_replica_compaction_threads_num(data_dirs.size()); |
| |
| RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") |
| .set_min_threads(base_compaction_threads) |
| .set_max_threads(base_compaction_threads) |
| .build(&_base_compaction_thread_pool)); |
| RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") |
| .set_min_threads(cumu_compaction_threads) |
| .set_max_threads(cumu_compaction_threads) |
| .build(&_cumu_compaction_thread_pool)); |
| RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool") |
| .set_min_threads(single_replica_compaction_threads) |
| .set_max_threads(single_replica_compaction_threads) |
| .build(&_single_replica_compaction_thread_pool)); |
| |
| if (config::enable_segcompaction) { |
| RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool") |
| .set_min_threads(config::segcompaction_num_threads) |
| .set_max_threads(config::segcompaction_num_threads) |
| .build(&_seg_compaction_thread_pool)); |
| } |
| RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool") |
| .set_min_threads(config::cold_data_compaction_thread_num) |
| .set_max_threads(config::cold_data_compaction_thread_num) |
| .build(&_cold_data_compaction_thread_pool)); |
| |
| // compaction tasks producer thread |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "compaction_tasks_producer_thread", |
| [this]() { this->_compaction_tasks_producer_callback(); }, |
| &_compaction_tasks_producer_thread)); |
| LOG(INFO) << "compaction tasks producer thread started"; |
| |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "_update_replica_infos_thread", |
| [this]() { this->_update_replica_infos_callback(); }, &_update_replica_infos_thread)); |
| LOG(INFO) << "tablet replicas info update thread started"; |
| |
| int32_t max_checkpoint_thread_num = config::max_meta_checkpoint_threads; |
| if (max_checkpoint_thread_num < 0) { |
| max_checkpoint_thread_num = cast_set<int32_t>(data_dirs.size()); |
| } |
| RETURN_IF_ERROR(ThreadPoolBuilder("TabletMetaCheckpointTaskThreadPool") |
| .set_max_threads(max_checkpoint_thread_num) |
| .build(&_tablet_meta_checkpoint_thread_pool)); |
| |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "tablet_checkpoint_tasks_producer_thread", |
| [this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); }, |
| &_tablet_checkpoint_tasks_producer_thread)); |
| LOG(INFO) << "tablet checkpoint tasks producer thread started"; |
| |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "tablet_path_check_thread", |
| [this]() { this->_tablet_path_check_callback(); }, &_tablet_path_check_thread)); |
| LOG(INFO) << "tablet path check thread started"; |
| |
| // path scan and gc thread |
| if (config::path_gc_check) { |
| for (auto data_dir : get_stores()) { |
| std::shared_ptr<Thread> path_gc_thread; |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "path_gc_thread", |
| [this, data_dir]() { this->_path_gc_thread_callback(data_dir); }, |
| &path_gc_thread)); |
| _path_gc_threads.emplace_back(path_gc_thread); |
| } |
| LOG(INFO) << "path gc threads started. number:" << get_stores().size(); |
| } |
| |
| RETURN_IF_ERROR(ThreadPoolBuilder("CooldownTaskThreadPool") |
| .set_min_threads(config::cooldown_thread_num) |
| .set_max_threads(config::cooldown_thread_num) |
| .build(&_cooldown_thread_pool)); |
| LOG(INFO) << "cooldown thread pool started"; |
| |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "cooldown_tasks_producer_thread", |
| [this]() { this->_cooldown_tasks_producer_callback(); }, |
| &_cooldown_tasks_producer_thread)); |
| LOG(INFO) << "cooldown tasks producer thread started"; |
| |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "remove_unused_remote_files_thread", |
| [this]() { this->_remove_unused_remote_files_callback(); }, |
| &_remove_unused_remote_files_thread)); |
| LOG(INFO) << "remove unused remote files thread started"; |
| |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "cold_data_compaction_producer_thread", |
| [this]() { this->_cold_data_compaction_producer_callback(); }, |
| &_cold_data_compaction_producer_thread)); |
| LOG(INFO) << "cold data compaction producer thread started"; |
| |
| // add tablet publish version thread pool |
| RETURN_IF_ERROR(ThreadPoolBuilder("TabletPublishTxnThreadPool") |
| .set_min_threads(config::tablet_publish_txn_max_thread) |
| .set_max_threads(config::tablet_publish_txn_max_thread) |
| .build(&_tablet_publish_txn_thread_pool)); |
| |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "async_publish_version_thread", |
| [this]() { this->_async_publish_callback(); }, &_async_publish_thread)); |
| LOG(INFO) << "async publish thread started"; |
| |
| RETURN_IF_ERROR(Thread::create( |
| "StorageEngine", "check_tablet_delete_bitmap_score_thread", |
| [this]() { this->_check_tablet_delete_bitmap_score_callback(); }, |
| &_check_delete_bitmap_score_thread)); |
| LOG(INFO) << "check tablet delete bitmap score thread started"; |
| |
| LOG(INFO) << "all storage engine's background threads are started."; |
| return Status::OK(); |
| } |
| |
| void StorageEngine::_garbage_sweeper_thread_callback() { |
| uint32_t max_interval = config::max_garbage_sweep_interval; |
| uint32_t min_interval = config::min_garbage_sweep_interval; |
| |
| if (max_interval < min_interval || min_interval <= 0) { |
| LOG(WARNING) << "garbage sweep interval config is illegal: [max=" << max_interval |
| << " min=" << min_interval << "]."; |
| min_interval = 1; |
| max_interval = max_interval >= min_interval ? max_interval : min_interval; |
| LOG(INFO) << "force reset garbage sweep interval. " |
| << "max_interval=" << max_interval << ", min_interval=" << min_interval; |
| } |
| |
| const double pi = M_PI; |
| double usage = 1.0; |
| // After the program starts, the first round of cleaning starts after min_interval. |
| uint32_t curr_interval = min_interval; |
| do { |
| // Function properties: |
| // when usage < 0.6, ratio close to 1.(interval close to max_interval) |
| // when usage at [0.6, 0.75], ratio is rapidly decreasing from 0.87 to 0.27. |
| // when usage > 0.75, ratio is slowly decreasing. |
| // when usage > 0.8, ratio close to min_interval. |
| // when usage = 0.88, ratio is approximately 0.0057. |
| double ratio = (1.1 * (pi / 2 - std::atan(usage * 100 / 5 - 14)) - 0.28) / pi; |
| ratio = ratio > 0 ? ratio : 0; |
| // TODO(dx): fix it |
| auto curr_interval_not_work = uint32_t(max_interval * ratio); |
| curr_interval_not_work = std::max(curr_interval_not_work, min_interval); |
| curr_interval_not_work = std::min(curr_interval_not_work, max_interval); |
| |
| // start clean trash and update usage. |
| Status res = start_trash_sweep(&usage); |
| if (res.ok() && _need_clean_trash.exchange(false, std::memory_order_relaxed)) { |
| res = start_trash_sweep(&usage, true); |
| } |
| |
| if (!res.ok()) { |
| LOG(WARNING) << "one or more errors occur when sweep trash." |
| << "see previous message for detail. err code=" << res; |
| // do nothing. continue next loop. |
| } |
| } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(curr_interval))); |
| } |
| |
| void StorageEngine::_disk_stat_monitor_thread_callback() { |
| int32_t interval = config::disk_stat_monitor_interval; |
| do { |
| _start_disk_stat_monitor(); |
| |
| interval = config::disk_stat_monitor_interval; |
| if (interval <= 0) { |
| LOG(WARNING) << "disk_stat_monitor_interval config is illegal: " << interval |
| << ", force set to 1"; |
| interval = 1; |
| } |
| } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
| } |
| |
| void StorageEngine::_unused_rowset_monitor_thread_callback() { |
| int32_t interval = config::unused_rowset_monitor_interval; |
| do { |
| start_delete_unused_rowset(); |
| |
| interval = config::unused_rowset_monitor_interval; |
| if (interval <= 0) { |
| LOG(WARNING) << "unused_rowset_monitor_interval config is illegal: " << interval |
| << ", force set to 1"; |
| interval = 1; |
| } |
| } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
| } |
| |
| int32_t StorageEngine::_auto_get_interval_by_disk_capacity(DataDir* data_dir) { |
| double disk_used = data_dir->get_usage(0); |
| double remain_used = 1 - disk_used; |
| DCHECK(remain_used >= 0 && remain_used <= 1); |
| DCHECK(config::path_gc_check_interval_second >= 0); |
| int32_t ret = 0; |
| if (remain_used > 0.9) { |
| // if config::path_gc_check_interval_second == 24h |
| ret = config::path_gc_check_interval_second; |
| } else if (remain_used > 0.7) { |
| // 12h |
| ret = config::path_gc_check_interval_second / 2; |
| } else if (remain_used > 0.5) { |
| // 6h |
| ret = config::path_gc_check_interval_second / 4; |
| } else if (remain_used > 0.3) { |
| // 4h |
| ret = config::path_gc_check_interval_second / 6; |
| } else { |
| // 3h |
| ret = config::path_gc_check_interval_second / 8; |
| } |
| return ret; |
| } |
| |
| void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) { |
| LOG(INFO) << "try to start path gc thread!"; |
| time_t last_exec_time = 0; |
| do { |
| time_t current_time = time(nullptr); |
| |
| int32_t interval = _auto_get_interval_by_disk_capacity(data_dir); |
| DBUG_EXECUTE_IF("_path_gc_thread_callback.interval.eq.1ms", { |
| LOG(INFO) << "debug point change interval eq 1ms"; |
| interval = 1; |
| while (DebugPoints::instance()->is_enable("_path_gc_thread_callback.always.do")) { |
| data_dir->perform_path_gc(); |
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| } |
| }); |
| if (interval <= 0) { |
| LOG(WARNING) << "path gc thread check interval config is illegal:" << interval |
| << " will be forced set to half hour"; |
| interval = 1800; // 0.5 hour |
| } |
| if (current_time - last_exec_time >= interval) { |
| LOG(INFO) << "try to perform path gc! disk remain [" << 1 - data_dir->get_usage(0) |
| << "] internal [" << interval << "]"; |
| data_dir->perform_path_gc(); |
| last_exec_time = time(nullptr); |
| } |
| } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(5))); |
| LOG(INFO) << "stop path gc thread!"; |
| } |
| |
| void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) { |
| int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs; |
| do { |
| for (auto data_dir : data_dirs) { |
| LOG(INFO) << "begin to produce tablet meta checkpoint tasks, data_dir=" |
| << data_dir->path(); |
| auto st = _tablet_meta_checkpoint_thread_pool->submit_func( |
| [data_dir, this]() { _tablet_manager->do_tablet_meta_checkpoint(data_dir); }); |
| if (!st.ok()) { |
| LOG(WARNING) << "submit tablet checkpoint tasks failed."; |
| } |
| } |
| interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs; |
| } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
| } |
| |
| void StorageEngine::_tablet_path_check_callback() { |
| struct TabletIdComparator { |
| bool operator()(Tablet* a, Tablet* b) { return a->tablet_id() < b->tablet_id(); } |
| }; |
| |
| using TabletQueue = std::priority_queue<Tablet*, std::vector<Tablet*>, TabletIdComparator>; |
| |
| int64_t interval = config::tablet_path_check_interval_seconds; |
| if (interval <= 0) { |
| return; |
| } |
| |
| int64_t last_tablet_id = 0; |
| do { |
| int32_t batch_size = config::tablet_path_check_batch_size; |
| if (batch_size <= 0) { |
| if (_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) { |
| break; |
| } |
| continue; |
| } |
| |
| LOG(INFO) << "start to check tablet path"; |
| |
| auto all_tablets = _tablet_manager->get_all_tablet( |
| [](Tablet* t) { return t->is_used() && t->tablet_state() == TABLET_RUNNING; }); |
| |
| TabletQueue big_id_tablets; |
| TabletQueue small_id_tablets; |
| for (auto tablet : all_tablets) { |
| auto tablet_id = tablet->tablet_id(); |
| TabletQueue* belong_tablets = nullptr; |
| if (tablet_id > last_tablet_id) { |
| if (big_id_tablets.size() < batch_size || |
| big_id_tablets.top()->tablet_id() > tablet_id) { |
| belong_tablets = &big_id_tablets; |
| } |
| } else if (big_id_tablets.size() < batch_size) { |
| if (small_id_tablets.size() < batch_size || |
| small_id_tablets.top()->tablet_id() > tablet_id) { |
| belong_tablets = &small_id_tablets; |
| } |
| } |
| if (belong_tablets != nullptr) { |
| belong_tablets->push(tablet.get()); |
| if (belong_tablets->size() > batch_size) { |
| belong_tablets->pop(); |
| } |
| } |
| } |
| |
| int32_t need_small_id_tablet_size = |
| batch_size - static_cast<int32_t>(big_id_tablets.size()); |
| |
| if (!big_id_tablets.empty()) { |
| last_tablet_id = big_id_tablets.top()->tablet_id(); |
| } |
| while (!big_id_tablets.empty()) { |
| big_id_tablets.top()->check_tablet_path_exists(); |
| big_id_tablets.pop(); |
| } |
| |
| if (!small_id_tablets.empty() && need_small_id_tablet_size > 0) { |
| while (static_cast<int32_t>(small_id_tablets.size()) > need_small_id_tablet_size) { |
| small_id_tablets.pop(); |
| } |
| |
| last_tablet_id = small_id_tablets.top()->tablet_id(); |
| while (!small_id_tablets.empty()) { |
| small_id_tablets.top()->check_tablet_path_exists(); |
| small_id_tablets.pop(); |
| } |
| } |
| |
| } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
| } |
| |
| void StorageEngine::_adjust_compaction_thread_num() { |
| TEST_SYNC_POINT_RETURN_WITH_VOID("StorageEngine::_adjust_compaction_thread_num.return_void"); |
| auto base_compaction_threads_num = get_base_compaction_threads_num(_store_map.size()); |
| if (_base_compaction_thread_pool->max_threads() != base_compaction_threads_num) { |
| int old_max_threads = _base_compaction_thread_pool->max_threads(); |
| Status status = _base_compaction_thread_pool->set_max_threads(base_compaction_threads_num); |
| if (status.ok()) { |
| VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads |
| << " to " << base_compaction_threads_num; |
| } |
| } |
| if (_base_compaction_thread_pool->min_threads() != base_compaction_threads_num) { |
| int old_min_threads = _base_compaction_thread_pool->min_threads(); |
| Status status = _base_compaction_thread_pool->set_min_threads(base_compaction_threads_num); |
| if (status.ok()) { |
| VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads |
| << " to " << base_compaction_threads_num; |
| } |
| } |
| |
| auto cumu_compaction_threads_num = get_cumu_compaction_threads_num(_store_map.size()); |
| if (_cumu_compaction_thread_pool->max_threads() != cumu_compaction_threads_num) { |
| int old_max_threads = _cumu_compaction_thread_pool->max_threads(); |
| Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_compaction_threads_num); |
| if (status.ok()) { |
| VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads |
| << " to " << cumu_compaction_threads_num; |
| } |
| } |
| if (_cumu_compaction_thread_pool->min_threads() != cumu_compaction_threads_num) { |
| int old_min_threads = _cumu_compaction_thread_pool->min_threads(); |
| Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_compaction_threads_num); |
| if (status.ok()) { |
| VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads |
| << " to " << cumu_compaction_threads_num; |
| } |
| } |
| |
| auto single_replica_compaction_threads_num = |
| get_single_replica_compaction_threads_num(_store_map.size()); |
| if (_single_replica_compaction_thread_pool->max_threads() != |
| single_replica_compaction_threads_num) { |
| int old_max_threads = _single_replica_compaction_thread_pool->max_threads(); |
| Status status = _single_replica_compaction_thread_pool->set_max_threads( |
| single_replica_compaction_threads_num); |
| if (status.ok()) { |
| VLOG_NOTICE << "update single replica compaction thread pool max_threads from " |
| << old_max_threads << " to " << single_replica_compaction_threads_num; |
| } |
| } |
| if (_single_replica_compaction_thread_pool->min_threads() != |
| single_replica_compaction_threads_num) { |
| int old_min_threads = _single_replica_compaction_thread_pool->min_threads(); |
| Status status = _single_replica_compaction_thread_pool->set_min_threads( |
| single_replica_compaction_threads_num); |
| if (status.ok()) { |
| VLOG_NOTICE << "update single replica compaction thread pool min_threads from " |
| << old_min_threads << " to " << single_replica_compaction_threads_num; |
| } |
| } |
| } |
| |
| void StorageEngine::_compaction_tasks_producer_callback() { |
| LOG(INFO) << "try to start compaction producer process!"; |
| |
| std::vector<DataDir*> data_dirs = get_stores(); |
| _compaction_submit_registry.reset(data_dirs); |
| |
| int round = 0; |
| CompactionType compaction_type; |
| |
| // Used to record the time when the score metric was last updated. |
| // The update of the score metric is accompanied by the logic of selecting the tablet. |
| // If there is no slot available, the logic of selecting the tablet will be terminated, |
| // which causes the score metric update to be terminated. |
| // In order to avoid this situation, we need to update the score regularly. |
| int64_t last_cumulative_score_update_time = 0; |
| int64_t last_base_score_update_time = 0; |
| static const int64_t check_score_interval_ms = 5000; // 5 secs |
| |
| int64_t interval = config::generate_compaction_tasks_interval_ms; |
| do { |
| int64_t cur_time = UnixMillis(); |
| if (!config::disable_auto_compaction && |
| (!config::enable_compaction_pause_on_high_memory || |
| !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) { |
| _adjust_compaction_thread_num(); |
| |
| bool check_score = false; |
| if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { |
| compaction_type = CompactionType::CUMULATIVE_COMPACTION; |
| round++; |
| if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) { |
| check_score = true; |
| last_cumulative_score_update_time = cur_time; |
| } |
| } else { |
| compaction_type = CompactionType::BASE_COMPACTION; |
| round = 0; |
| if (cur_time - last_base_score_update_time >= check_score_interval_ms) { |
| check_score = true; |
| last_base_score_update_time = cur_time; |
| } |
| } |
| std::unique_ptr<ThreadPool>& thread_pool = |
| (compaction_type == CompactionType::CUMULATIVE_COMPACTION) |
| ? _cumu_compaction_thread_pool |
| : _base_compaction_thread_pool; |
| bvar::Status<int64_t>& g_compaction_task_num_per_round = |
| (compaction_type == CompactionType::CUMULATIVE_COMPACTION) |
| ? g_cumu_compaction_task_num_per_round |
| : g_base_compaction_task_num_per_round; |
| if (config::compaction_num_per_round != -1) { |
| _compaction_num_per_round = config::compaction_num_per_round; |
| } else if (thread_pool->get_queue_size() == 0) { |
| // If all tasks in the thread pool queue are executed, |
| // double the number of tasks generated each time, |
| // with a maximum of config::max_automatic_compaction_num_per_round tasks per generation. |
| if (_compaction_num_per_round < config::max_automatic_compaction_num_per_round) { |
| _compaction_num_per_round *= 2; |
| g_compaction_task_num_per_round.set_value(_compaction_num_per_round); |
| } |
| } else if (thread_pool->get_queue_size() > _compaction_num_per_round / 2) { |
| // If all tasks in the thread pool is greater than |
| // half of the tasks submitted in the previous round, |
| // reduce the number of tasks generated each time by half, with a minimum of 1. |
| if (_compaction_num_per_round > 1) { |
| _compaction_num_per_round /= 2; |
| g_compaction_task_num_per_round.set_value(_compaction_num_per_round); |
| } |
| } |
| std::vector<TabletSharedPtr> tablets_compaction = |
| _generate_compaction_tasks(compaction_type, data_dirs, check_score); |
| if (tablets_compaction.size() == 0) { |
| std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex); |
| _wakeup_producer_flag = 0; |
| // It is necessary to wake up the thread on timeout to prevent deadlock |
| // in case of no running compaction task. |
| _compaction_producer_sleep_cv.wait_for( |
| lock, std::chrono::milliseconds(2000), |
| [this] { return _wakeup_producer_flag == 1; }); |
| continue; |
| } |
| |
| for (const auto& tablet : tablets_compaction) { |
| if (compaction_type == CompactionType::BASE_COMPACTION) { |
| tablet->set_last_base_compaction_schedule_time(UnixMillis()); |
| } else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { |
| tablet->set_last_cumu_compaction_schedule_time(UnixMillis()); |
| } else if (compaction_type == CompactionType::FULL_COMPACTION) { |
| tablet->set_last_full_compaction_schedule_time(UnixMillis()); |
| } |
| Status st = _submit_compaction_task(tablet, compaction_type, false); |
| if (!st.ok()) { |
| LOG(WARNING) << "failed to submit compaction task for tablet: " |
| << tablet->tablet_id() << ", err: " << st; |
| } |
| } |
| interval = config::generate_compaction_tasks_interval_ms; |
| } else { |
| interval = 5000; // 5s to check disable_auto_compaction |
| } |
| |
| // wait some seconds for ut test |
| { |
| std ::vector<std ::any> args {}; |
| args.emplace_back(1); |
| doris ::SyncPoint ::get_instance()->process( |
| "StorageEngine::_compaction_tasks_producer_callback", std ::move(args)); |
| } |
| int64_t end_time = UnixMillis(); |
| DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time - |
| cur_time); |
| } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); |
| } |
| |
| void StorageEngine::_update_replica_infos_callback() { |
| #ifdef GOOGLE_PROFILER |
| ProfilerRegisterThread(); |
| #endif |
| LOG(INFO) << "start to update replica infos!"; |
| |
| int64_t interval = config::update_replica_infos_interval_seconds; |
| do { |
| auto all_tablets = _tablet_manager->get_all_tablet([](Tablet* t) { |
| return t->is_used() && t->tablet_state() == TABLET_RUNNING && |
| !t->tablet_meta()->tablet_schema()->disable_auto_compaction() && |
| t->tablet_meta()->tablet_schema()->enable_single_replica_compaction(); |
| }); |
| ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info(); |
| if (cluster_info == nullptr) { |
| LOG(WARNING) << "Have not get FE Master heartbeat yet"; |
| std::this_thread::sleep_for(std::chrono::seconds(2)); |
| continue; |
| } |
| TNetworkAddress master_addr = cluster_info->master_fe_addr; |
| if (master_addr.hostname == "" || master_addr.port == 0) { |
| LOG(WARNING) << "Have not get FE Master heartbeat yet"; |
| std::this_thread::sleep_for(std::chrono::seconds(2)); |
| continue; |
| } |
| |
| int start = 0; |
| int tablet_size = cast_set<int>(all_tablets.size()); |
| // The while loop may take a long time, we should skip it when stop |
| while (start < tablet_size && _stop_background_threads_latch.count() > 0) { |
| int batch_size = std::min(100, tablet_size - start); |
| int end = start + batch_size; |
| TGetTabletReplicaInfosRequest request; |
| TGetTabletReplicaInfosResult result; |
| for (int i = start; i < end; i++) { |
| request.tablet_ids.emplace_back(all_tablets[i]->tablet_id()); |
| } |
| Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
| master_addr.hostname, master_addr.port, |
| [&request, &result](FrontendServiceConnection& client) { |
| client->getTabletReplicaInfos(result, request); |
| }); |
| |
| if (!rpc_st.ok()) { |
| LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc failure, " |
| "tablet start: " |
| << start << " end: " << end; |
| continue; |
| } |
| |
| std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex); |
| for (const auto& it : result.tablet_replica_infos) { |
| auto tablet_id = it.first; |
| auto tablet = _tablet_manager->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| VLOG_CRITICAL << "tablet ptr is nullptr"; |
| continue; |
| } |
| |
| VLOG_NOTICE << tablet_id << " tablet has " << it.second.size() << " replicas"; |
| uint64_t min_modulo = MOD_PRIME; |
| TReplicaInfo peer_replica; |
| for (const auto& replica : it.second) { |
| int64_t peer_replica_id = replica.replica_id; |
| uint64_t modulo = HashUtil::hash64(&peer_replica_id, sizeof(peer_replica_id), |
| DEFAULT_SEED) % |
| MOD_PRIME; |
| if (modulo < min_modulo) { |
| peer_replica = replica; |
| min_modulo = modulo; |
| } |
| } |
| VLOG_NOTICE << "tablet " << tablet_id << ", peer replica host is " |
| << peer_replica.host; |
| _peer_replica_infos[tablet_id] = peer_replica; |
| } |
| _token = result.token; |
| VLOG_NOTICE << "get tablet replica infos from fe, size is " << end - start |
| << " token = " << result.token; |
| start = end; |
| } |
| interval = config::update_replica_infos_interval_seconds; |
| } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
| } |
| |
| Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tablet, |
| CompactionType compaction_type) { |
| // For single replica compaction, the local version to be merged is determined based on the version fetched from the peer replica. |
| // Therefore, it is currently not possible to determine whether it should be a base compaction or cumulative compaction. |
| // As a result, the tablet needs to be pushed to both the _tablet_submitted_cumu_compaction and the _tablet_submitted_base_compaction simultaneously. |
| bool already_exist = |
| _compaction_submit_registry.insert(tablet, CompactionType::CUMULATIVE_COMPACTION); |
| if (already_exist) { |
| return Status::AlreadyExist<false>( |
| "compaction task has already been submitted, tablet_id={}", tablet->tablet_id()); |
| } |
| |
| already_exist = _compaction_submit_registry.insert(tablet, CompactionType::BASE_COMPACTION); |
| if (already_exist) { |
| _pop_tablet_from_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION); |
| return Status::AlreadyExist<false>( |
| "compaction task has already been submitted, tablet_id={}", tablet->tablet_id()); |
| } |
| |
| auto compaction = std::make_shared<SingleReplicaCompaction>(*this, tablet, compaction_type); |
| DorisMetrics::instance()->single_compaction_request_total->increment(1); |
| auto st = compaction->prepare_compact(); |
| |
| auto clean_single_replica_compaction = [tablet, this]() { |
| _pop_tablet_from_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION); |
| _pop_tablet_from_submitted_compaction(tablet, CompactionType::BASE_COMPACTION); |
| }; |
| |
| if (!st.ok()) { |
| clean_single_replica_compaction(); |
| if (!st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) { |
| LOG(WARNING) << "failed to prepare single replica compaction, tablet_id=" |
| << tablet->tablet_id() << " : " << st; |
| return st; |
| } |
| return Status::OK(); // No suitable version, regard as OK |
| } |
| |
| auto submit_st = _single_replica_compaction_thread_pool->submit_func( |
| [tablet, compaction = std::move(compaction), |
| clean_single_replica_compaction]() mutable { |
| tablet->execute_single_replica_compaction(*compaction); |
| clean_single_replica_compaction(); |
| }); |
| if (!submit_st.ok()) { |
| clean_single_replica_compaction(); |
| return Status::InternalError( |
| "failed to submit single replica compaction task to thread pool, " |
| "tablet_id={}", |
| tablet->tablet_id()); |
| } |
| return Status::OK(); |
| } |
| |
| void StorageEngine::get_tablet_rowset_versions(const PGetTabletVersionsRequest* request, |
| PGetTabletVersionsResponse* response) { |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(request->tablet_id()); |
| if (tablet == nullptr) { |
| response->mutable_status()->set_status_code(TStatusCode::CANCELLED); |
| return; |
| } |
| std::vector<Version> local_versions = tablet->get_all_local_versions(); |
| for (const auto& local_version : local_versions) { |
| auto version = response->add_versions(); |
| version->set_first(local_version.first); |
| version->set_second(local_version.second); |
| } |
| response->mutable_status()->set_status_code(0); |
| } |
| |
| bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk, |
| CompactionType compaction_type, bool all_base) { |
| // We need to reserve at least one Slot for cumulative compaction. |
| // So when there is only one Slot, we have to judge whether there is a cumulative compaction |
| // in the current submitted tasks. |
| // If so, the last Slot can be assigned to Base compaction, |
| // otherwise, this Slot needs to be reserved for cumulative compaction. |
| if (task_cnt_per_disk >= thread_per_disk) { |
| // Return if no available slot |
| return false; |
| } else if (task_cnt_per_disk >= thread_per_disk - 1) { |
| // Only one slot left, check if it can be assigned to base compaction task. |
| if (compaction_type == CompactionType::BASE_COMPACTION) { |
| if (all_base) { |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| |
| int get_concurrent_per_disk(int max_score, int thread_per_disk) { |
| if (!config::enable_compaction_priority_scheduling) { |
| return thread_per_disk; |
| } |
| |
| double load_average = 0; |
| if (DorisMetrics::instance()->system_metrics() != nullptr) { |
| load_average = DorisMetrics::instance()->system_metrics()->get_load_average_1_min(); |
| } |
| int num_cores = doris::CpuInfo::num_cores(); |
| bool cpu_usage_high = load_average > num_cores * 0.8; |
| |
| auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); |
| bool memory_usage_high = static_cast<double>(process_memory_usage) > |
| static_cast<double>(MemInfo::soft_mem_limit()) * 0.8; |
| |
| if (max_score <= config::low_priority_compaction_score_threshold && |
| (cpu_usage_high || memory_usage_high)) { |
| return config::low_priority_compaction_task_num_per_disk; |
| } |
| |
| return thread_per_disk; |
| } |
| |
| int32_t disk_compaction_slot_num(const DataDir& data_dir) { |
| return data_dir.is_ssd_disk() ? config::compaction_task_num_per_fast_disk |
| : config::compaction_task_num_per_disk; |
| } |
| |
| bool has_free_compaction_slot(CompactionSubmitRegistry* registry, DataDir* dir, |
| CompactionType compaction_type, uint32_t executing_cnt) { |
| int32_t thread_per_disk = disk_compaction_slot_num(*dir); |
| return need_generate_compaction_tasks( |
| executing_cnt, thread_per_disk, compaction_type, |
| !registry->has_compaction_task(dir, CompactionType::CUMULATIVE_COMPACTION)); |
| } |
| |
| std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( |
| CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) { |
| TEST_SYNC_POINT_RETURN_WITH_VALUE("olap_server::_generate_compaction_tasks.return_empty", |
| std::vector<TabletSharedPtr> {}); |
| _update_cumulative_compaction_policy(); |
| std::vector<TabletSharedPtr> tablets_compaction; |
| uint32_t max_compaction_score = 0; |
| |
| std::random_device rd; |
| std::mt19937 g(rd()); |
| std::shuffle(data_dirs.begin(), data_dirs.end(), g); |
| |
| // Copy _tablet_submitted_xxx_compaction map so that we don't need to hold _tablet_submitted_compaction_mutex |
| // when traversing the data dir |
| auto compaction_registry_snapshot = _compaction_submit_registry.create_snapshot(); |
| for (auto* data_dir : data_dirs) { |
| bool need_pick_tablet = true; |
| uint32_t executing_task_num = |
| compaction_registry_snapshot.count_executing_cumu_and_base(data_dir); |
| need_pick_tablet = has_free_compaction_slot(&compaction_registry_snapshot, data_dir, |
| compaction_type, executing_task_num); |
| if (!need_pick_tablet && !check_score) { |
| continue; |
| } |
| |
| // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(), |
| // So that we can update the max_compaction_score metric. |
| if (!data_dir->reach_capacity_limit(0)) { |
| uint32_t disk_max_score = 0; |
| auto tablets = compaction_registry_snapshot.pick_topn_tablets_for_compaction( |
| _tablet_manager.get(), data_dir, compaction_type, |
| _cumulative_compaction_policies, &disk_max_score); |
| int concurrent_num = |
| get_concurrent_per_disk(disk_max_score, disk_compaction_slot_num(*data_dir)); |
| need_pick_tablet = need_generate_compaction_tasks( |
| executing_task_num, concurrent_num, compaction_type, |
| !compaction_registry_snapshot.has_compaction_task( |
| data_dir, CompactionType::CUMULATIVE_COMPACTION)); |
| for (const auto& tablet : tablets) { |
| if (tablet != nullptr) { |
| if (need_pick_tablet) { |
| tablets_compaction.emplace_back(tablet); |
| } |
| max_compaction_score = std::max(max_compaction_score, disk_max_score); |
| } |
| } |
| } |
| } |
| |
| if (max_compaction_score > 0) { |
| if (compaction_type == CompactionType::BASE_COMPACTION) { |
| DorisMetrics::instance()->tablet_base_max_compaction_score->set_value( |
| max_compaction_score); |
| } else { |
| DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value( |
| max_compaction_score); |
| } |
| } |
| return tablets_compaction; |
| } |
| |
| void StorageEngine::_update_cumulative_compaction_policy() { |
| if (_cumulative_compaction_policies.empty()) { |
| _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] = |
| CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( |
| CUMULATIVE_SIZE_BASED_POLICY); |
| _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] = |
| CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( |
| CUMULATIVE_TIME_SERIES_POLICY); |
| } |
| } |
| |
| void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet, |
| CompactionType compaction_type) { |
| _compaction_submit_registry.remove(tablet, compaction_type, [this]() { |
| std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex); |
| _wakeup_producer_flag = 1; |
| _compaction_producer_sleep_cv.notify_one(); |
| }); |
| } |
| |
| Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, |
| CompactionType compaction_type, bool force) { |
| if (tablet->tablet_meta()->tablet_schema()->enable_single_replica_compaction() && |
| should_fetch_from_peer(tablet->tablet_id())) { |
| VLOG_CRITICAL << "start to submit single replica compaction task for tablet: " |
| << tablet->tablet_id(); |
| Status st = _submit_single_replica_compaction_task(tablet, compaction_type); |
| if (!st.ok()) { |
| LOG(WARNING) << "failed to submit single replica compaction task for tablet: " |
| << tablet->tablet_id() << ", err: " << st; |
| } |
| |
| return Status::OK(); |
| } |
| bool already_exist = _compaction_submit_registry.insert(tablet, compaction_type); |
| if (already_exist) { |
| return Status::AlreadyExist<false>( |
| "compaction task has already been submitted, tablet_id={}, compaction_type={}.", |
| tablet->tablet_id(), compaction_type); |
| } |
| tablet->compaction_stage = CompactionStage::PENDING; |
| std::shared_ptr<CompactionMixin> compaction; |
| int64_t permits = 0; |
| Status st = Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet, |
| compaction, permits); |
| if (st.ok() && permits > 0) { |
| if (!force) { |
| _permit_limiter.request(permits); |
| } |
| std::unique_ptr<ThreadPool>& thread_pool = |
| (compaction_type == CompactionType::CUMULATIVE_COMPACTION) |
| ? _cumu_compaction_thread_pool |
| : _base_compaction_thread_pool; |
| VLOG_CRITICAL << "compaction thread pool. type: " |
| << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" |
| : "BASE") |
| << ", num_threads: " << thread_pool->num_threads() |
| << ", num_threads_pending_start: " << thread_pool->num_threads_pending_start() |
| << ", num_active_threads: " << thread_pool->num_active_threads() |
| << ", max_threads: " << thread_pool->max_threads() |
| << ", min_threads: " << thread_pool->min_threads() |
| << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); |
| auto status = thread_pool->submit_func([tablet, compaction = std::move(compaction), |
| compaction_type, permits, force, this]() { |
| if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] { |
| DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1); |
| DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
| _cumu_compaction_thread_pool->get_queue_size()); |
| } else if (compaction_type == CompactionType::BASE_COMPACTION) { |
| DorisMetrics::instance()->base_compaction_task_running_total->increment(1); |
| DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
| _base_compaction_thread_pool->get_queue_size()); |
| } |
| bool is_large_task = true; |
| Defer defer {[&]() { |
| DBUG_EXECUTE_IF("StorageEngine._submit_compaction_task.sleep", { sleep(5); }) |
| if (!force) { |
| _permit_limiter.release(permits); |
| } |
| _pop_tablet_from_submitted_compaction(tablet, compaction_type); |
| tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; |
| if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { |
| std::lock_guard<std::mutex> lock(_cumu_compaction_delay_mtx); |
| _cumu_compaction_thread_pool_used_threads--; |
| if (!is_large_task) { |
| _cumu_compaction_thread_pool_small_tasks_running--; |
| } |
| DorisMetrics::instance()->cumulative_compaction_task_running_total->increment( |
| -1); |
| DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
| _cumu_compaction_thread_pool->get_queue_size()); |
| } else if (compaction_type == CompactionType::BASE_COMPACTION) { |
| DorisMetrics::instance()->base_compaction_task_running_total->increment(-1); |
| DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
| _base_compaction_thread_pool->get_queue_size()); |
| } |
| }}; |
| do { |
| if (compaction->compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { |
| std::lock_guard<std::mutex> lock(_cumu_compaction_delay_mtx); |
| _cumu_compaction_thread_pool_used_threads++; |
| if (config::large_cumu_compaction_task_min_thread_num > 1 && |
| _cumu_compaction_thread_pool->max_threads() >= |
| config::large_cumu_compaction_task_min_thread_num) { |
| // Determine if this is a large task based on configured thresholds |
| is_large_task = |
| (compaction->calc_input_rowsets_total_size() > |
| config::large_cumu_compaction_task_bytes_threshold || |
| compaction->calc_input_rowsets_row_num() > |
| config::large_cumu_compaction_task_row_num_threshold); |
| |
| // Small task. No delay needed |
| if (!is_large_task) { |
| _cumu_compaction_thread_pool_small_tasks_running++; |
| break; |
| } |
| // Deal with large task |
| if (_should_delay_large_task()) { |
| LOG_WARNING( |
| "failed to do CumulativeCompaction, cumu thread pool is " |
| "intensive, delay large task.") |
| .tag("tablet_id", tablet->tablet_id()) |
| .tag("input_rows", compaction->calc_input_rowsets_row_num()) |
| .tag("input_rowsets_total_size", |
| compaction->calc_input_rowsets_total_size()) |
| .tag("config::large_cumu_compaction_task_bytes_threshold", |
| config::large_cumu_compaction_task_bytes_threshold) |
| .tag("config::large_cumu_compaction_task_row_num_threshold", |
| config::large_cumu_compaction_task_row_num_threshold) |
| .tag("remaining threads", |
| _cumu_compaction_thread_pool_used_threads) |
| .tag("small_tasks_running", |
| _cumu_compaction_thread_pool_small_tasks_running); |
| // Delay this task and sleep 5s for this tablet |
| long now = duration_cast<std::chrono::milliseconds>( |
| std::chrono::system_clock::now().time_since_epoch()) |
| .count(); |
| tablet->set_last_cumu_compaction_failure_time(now); |
| return; |
| } |
| } |
| } |
| } while (false); |
| if (!tablet->can_do_compaction(tablet->data_dir()->path_hash(), compaction_type)) { |
| LOG(INFO) << "Tablet state has been changed, no need to begin this compaction " |
| "task, tablet_id=" |
| << tablet->tablet_id() << ", tablet_state=" << tablet->tablet_state(); |
| return; |
| } |
| tablet->compaction_stage = CompactionStage::EXECUTING; |
| TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction"); |
| tablet->execute_compaction(*compaction); |
| }); |
| if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] { |
| DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
| _cumu_compaction_thread_pool->get_queue_size()); |
| } else if (compaction_type == CompactionType::BASE_COMPACTION) { |
| DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
| _base_compaction_thread_pool->get_queue_size()); |
| } |
| if (!st.ok()) { |
| if (!force) { |
| _permit_limiter.release(permits); |
| } |
| _pop_tablet_from_submitted_compaction(tablet, compaction_type); |
| tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; |
| return Status::InternalError( |
| "failed to submit compaction task to thread pool, " |
| "tablet_id={}, compaction_type={}.", |
| tablet->tablet_id(), compaction_type); |
| } |
| return Status::OK(); |
| } else { |
| _pop_tablet_from_submitted_compaction(tablet, compaction_type); |
| tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; |
| if (!st.ok()) { |
| return Status::InternalError( |
| "failed to prepare compaction task and calculate permits, " |
| "tablet_id={}, compaction_type={}, " |
| "permit={}, current_permit={}, status={}", |
| tablet->tablet_id(), compaction_type, permits, _permit_limiter.usage(), |
| st.to_string()); |
| } |
| return st; |
| } |
| } |
| |
| Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, |
| bool force, bool eager) { |
| if (!eager) { |
| DCHECK(compaction_type == CompactionType::BASE_COMPACTION || |
| compaction_type == CompactionType::CUMULATIVE_COMPACTION); |
| auto compaction_registry_snapshot = _compaction_submit_registry.create_snapshot(); |
| auto stores = get_stores(); |
| |
| bool is_busy = std::none_of( |
| stores.begin(), stores.end(), |
| [&compaction_registry_snapshot, compaction_type](auto* data_dir) { |
| return has_free_compaction_slot( |
| &compaction_registry_snapshot, data_dir, compaction_type, |
| compaction_registry_snapshot.count_executing_cumu_and_base(data_dir)); |
| }); |
| if (is_busy) { |
| LOG_EVERY_N(WARNING, 100) |
| << "Too busy to submit a compaction task, tablet=" << tablet->get_table_id(); |
| return Status::OK(); |
| } |
| } |
| _update_cumulative_compaction_policy(); |
| // alter table tableName set ("compaction_policy"="time_series") |
| // if atler table's compaction policy, we need to modify tablet compaction policy shared ptr |
| if (tablet->get_cumulative_compaction_policy() == nullptr || |
| tablet->get_cumulative_compaction_policy()->name() != |
| tablet->tablet_meta()->compaction_policy()) { |
| tablet->set_cumulative_compaction_policy( |
| _cumulative_compaction_policies.at(tablet->tablet_meta()->compaction_policy())); |
| } |
| tablet->set_skip_compaction(false); |
| return _submit_compaction_task(tablet, compaction_type, force); |
| } |
| |
| Status StorageEngine::_handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker, |
| SegCompactionCandidatesSharedPtr segments, |
| uint64_t submission_time) { |
| // note: be aware that worker->_writer maybe released when the task is cancelled |
| uint64_t exec_queue_time = GetCurrentTimeMicros() - submission_time; |
| LOG(INFO) << "segcompaction thread pool queue time(ms): " << exec_queue_time / 1000; |
| worker->compact_segments(segments); |
| // return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status |
| return Status::OK(); |
| } |
| |
| Status StorageEngine::submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker, |
| SegCompactionCandidatesSharedPtr segments) { |
| uint64_t submission_time = GetCurrentTimeMicros(); |
| return _seg_compaction_thread_pool->submit_func([this, worker, segments, submission_time] { |
| static_cast<void>(_handle_seg_compaction(worker, segments, submission_time)); |
| }); |
| } |
| |
| Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq& request) { |
| auto tablet_id = request.tablet_id; |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
| DBUG_EXECUTE_IF("StorageEngine::process_index_change_task_tablet_nullptr", |
| { tablet = nullptr; }) |
| if (tablet == nullptr) { |
| LOG(WARNING) << "tablet: " << tablet_id << " not exist"; |
| return Status::InternalError("tablet not exist, tablet_id={}.", tablet_id); |
| } |
| |
| IndexBuilderSharedPtr index_builder = std::make_shared<IndexBuilder>( |
| *this, tablet, request.columns, request.alter_inverted_indexes, request.is_drop_op); |
| RETURN_IF_ERROR(_handle_index_change(index_builder)); |
| return Status::OK(); |
| } |
| |
| Status StorageEngine::_handle_index_change(IndexBuilderSharedPtr index_builder) { |
| RETURN_IF_ERROR(index_builder->init()); |
| RETURN_IF_ERROR(index_builder->do_build_inverted_index()); |
| return Status::OK(); |
| } |
| |
| void StorageEngine::_cooldown_tasks_producer_callback() { |
| int64_t interval = config::generate_cooldown_task_interval_sec; |
| // the cooldown replica may be slow to upload it's meta file, so we should wait |
| // until it has done uploaded |
| int64_t skip_failed_interval = interval * 10; |
| do { |
| // these tables are ordered by priority desc |
| std::vector<TabletSharedPtr> tablets; |
| std::vector<RowsetSharedPtr> rowsets; |
| // TODO(luwei) : a more efficient way to get cooldown tablets |
| auto cur_time = time(nullptr); |
| // we should skip all the tablets which are not running and those pending to do cooldown |
| // also tablets once failed to do follow cooldown |
| auto skip_tablet = [this, skip_failed_interval, |
| cur_time](const TabletSharedPtr& tablet) -> bool { |
| bool is_skip = |
| cur_time - tablet->last_failed_follow_cooldown_time() < skip_failed_interval || |
| TABLET_RUNNING != tablet->tablet_state(); |
| if (is_skip) { |
| return is_skip; |
| } |
| std::lock_guard<std::mutex> lock(_running_cooldown_mutex); |
| return _running_cooldown_tablets.find(tablet->tablet_id()) != |
| _running_cooldown_tablets.end(); |
| }; |
| _tablet_manager->get_cooldown_tablets(&tablets, &rowsets, std::move(skip_tablet)); |
| LOG(INFO) << "cooldown producer get tablet num: " << tablets.size(); |
| int max_priority = cast_set<int>(tablets.size()); |
| int index = 0; |
| for (const auto& tablet : tablets) { |
| { |
| std::lock_guard<std::mutex> lock(_running_cooldown_mutex); |
| _running_cooldown_tablets.insert(tablet->tablet_id()); |
| } |
| PriorityThreadPool::Task task; |
| RowsetSharedPtr rowset = std::move(rowsets[index++]); |
| task.work_function = [tablet, rowset, task_size = tablets.size(), this]() { |
| Status st = tablet->cooldown(rowset); |
| { |
| std::lock_guard<std::mutex> lock(_running_cooldown_mutex); |
| _running_cooldown_tablets.erase(tablet->tablet_id()); |
| } |
| if (!st.ok()) { |
| LOG(WARNING) << "failed to cooldown, tablet: " << tablet->tablet_id() |
| << " err: " << st; |
| } else { |
| LOG(INFO) << "succeed to cooldown, tablet: " << tablet->tablet_id() |
| << " cooldown progress (" |
| << task_size - _cooldown_thread_pool->get_queue_size() << "/" |
| << task_size << ")"; |
| } |
| }; |
| task.priority = max_priority--; |
| bool submited = _cooldown_thread_pool->offer(std::move(task)); |
| |
| if (!submited) { |
| LOG(INFO) << "failed to submit cooldown task"; |
| } |
| } |
| } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
| } |
| |
| void StorageEngine::_remove_unused_remote_files_callback() { |
| while (!_stop_background_threads_latch.wait_for( |
| std::chrono::seconds(config::remove_unused_remote_files_interval_sec))) { |
| LOG(INFO) << "begin to remove unused remote files"; |
| do_remove_unused_remote_files(); |
| } |
| } |
| |
| void StorageEngine::do_remove_unused_remote_files() { |
| auto tablets = tablet_manager()->get_all_tablet([](Tablet* t) { |
| return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() && |
| t->tablet_state() == TABLET_RUNNING && |
| t->cooldown_conf_unlocked().cooldown_replica_id == t->replica_id(); |
| }); |
| TConfirmUnusedRemoteFilesRequest req; |
| req.__isset.confirm_list = true; |
| // tablet_id -> [storage_resource, unused_remote_files] |
| using unused_remote_files_buffer_t = |
| std::unordered_map<int64_t, std::pair<StorageResource, std::vector<io::FileInfo>>>; |
| unused_remote_files_buffer_t buffer; |
| int64_t num_files_in_buffer = 0; |
| // assume a filename is 0.1KB, buffer size should not larger than 100MB |
| constexpr int64_t max_files_in_buffer = 1000000; |
| |
| auto calc_unused_remote_files = [&req, &buffer, &num_files_in_buffer, this](Tablet* t) { |
| auto storage_resource = get_resource_by_storage_policy_id(t->storage_policy_id()); |
| if (!storage_resource) { |
| LOG(WARNING) << "encounter error when remove unused remote files, tablet_id=" |
| << t->tablet_id() << " : " << storage_resource.error(); |
| return; |
| } |
| |
| // TODO(plat1ko): Support path v1 |
| if (storage_resource->path_version > 0) { |
| return; |
| } |
| |
| std::vector<io::FileInfo> files; |
| // FIXME(plat1ko): What if user reset resource in storage policy to another resource? |
| // Maybe we should also list files in previously uploaded resources. |
| bool exists = true; |
| auto st = storage_resource->fs->list(storage_resource->remote_tablet_path(t->tablet_id()), |
| true, &files, &exists); |
| if (!st.ok()) { |
| LOG(WARNING) << "encounter error when remove unused remote files, tablet_id=" |
| << t->tablet_id() << " : " << st; |
| return; |
| } |
| if (!exists || files.empty()) { |
| return; |
| } |
| // get all cooldowned rowsets |
| RowsetIdUnorderedSet cooldowned_rowsets; |
| UniqueId cooldown_meta_id; |
| { |
| std::shared_lock rlock(t->get_header_lock()); |
| for (const auto& [_, rs_meta] : t->tablet_meta()->all_rs_metas()) { |
| if (!rs_meta->is_local()) { |
| cooldowned_rowsets.insert(rs_meta->rowset_id()); |
| } |
| } |
| if (cooldowned_rowsets.empty()) { |
| return; |
| } |
| cooldown_meta_id = t->tablet_meta()->cooldown_meta_id(); |
| } |
| auto [cooldown_term, cooldown_replica_id] = t->cooldown_conf(); |
| if (cooldown_replica_id != t->replica_id()) { |
| return; |
| } |
| // {cooldown_replica_id}.{cooldown_term}.meta |
| std::string remote_meta_path = |
| cooldown_tablet_meta_filename(cooldown_replica_id, cooldown_term); |
| // filter out the paths that should be reserved |
| auto filter = [&, this](io::FileInfo& info) { |
| std::string_view filename = info.file_name; |
| if (filename.ends_with(".meta")) { |
| return filename == remote_meta_path; |
| } |
| auto rowset_id = extract_rowset_id(filename); |
| if (rowset_id.hi == 0) { |
| return false; |
| } |
| return cooldowned_rowsets.contains(rowset_id) || |
| pending_remote_rowsets().contains(rowset_id); |
| }; |
| files.erase(std::remove_if(files.begin(), files.end(), std::move(filter)), files.end()); |
| if (files.empty()) { |
| return; |
| } |
| files.shrink_to_fit(); |
| num_files_in_buffer += files.size(); |
| buffer.insert({t->tablet_id(), {*storage_resource, std::move(files)}}); |
| auto& info = req.confirm_list.emplace_back(); |
| info.__set_tablet_id(t->tablet_id()); |
| info.__set_cooldown_replica_id(cooldown_replica_id); |
| info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift()); |
| }; |
| |
| auto confirm_and_remove_files = [&buffer, &req, &num_files_in_buffer]() { |
| TConfirmUnusedRemoteFilesResult result; |
| LOG(INFO) << "begin to confirm unused remote files. num_tablets=" << buffer.size() |
| << " num_files=" << num_files_in_buffer; |
| auto st = MasterServerClient::instance()->confirm_unused_remote_files(req, &result); |
| if (!st.ok()) { |
| LOG(WARNING) << st; |
| return; |
| } |
| for (auto id : result.confirmed_tablets) { |
| if (auto it = buffer.find(id); LIKELY(it != buffer.end())) { |
| auto& storage_resource = it->second.first; |
| auto& files = it->second.second; |
| std::vector<io::Path> paths; |
| paths.reserve(files.size()); |
| // delete unused files |
| LOG(INFO) << "delete unused files. root_path=" << storage_resource.fs->root_path() |
| << " tablet_id=" << id; |
| io::Path dir = storage_resource.remote_tablet_path(id); |
| for (auto& file : files) { |
| auto file_path = dir / file.file_name; |
| LOG(INFO) << "delete unused file: " << file_path.native(); |
| paths.push_back(std::move(file_path)); |
| } |
| st = storage_resource.fs->batch_delete(paths); |
| if (!st.ok()) { |
| LOG(WARNING) << "failed to delete unused files, tablet_id=" << id << " : " |
| << st; |
| } |
| buffer.erase(it); |
| } |
| } |
| }; |
| |
| // batch confirm to reduce FE's overhead |
| auto next_confirm_time = std::chrono::steady_clock::now() + |
| std::chrono::seconds(config::confirm_unused_remote_files_interval_sec); |
| for (auto& t : tablets) { |
| if (t.use_count() <= 1 // this means tablet has been dropped |
| || t->cooldown_conf_unlocked().cooldown_replica_id != t->replica_id() || |
| t->tablet_state() != TABLET_RUNNING) { |
| continue; |
| } |
| calc_unused_remote_files(t.get()); |
| if (num_files_in_buffer > 0 && (num_files_in_buffer > max_files_in_buffer || |
| std::chrono::steady_clock::now() > next_confirm_time)) { |
| confirm_and_remove_files(); |
| buffer.clear(); |
| req.confirm_list.clear(); |
| num_files_in_buffer = 0; |
| next_confirm_time = |
| std::chrono::steady_clock::now() + |
| std::chrono::seconds(config::confirm_unused_remote_files_interval_sec); |
| } |
| } |
| if (num_files_in_buffer > 0) { |
| confirm_and_remove_files(); |
| } |
| } |
| |
| void StorageEngine::_cold_data_compaction_producer_callback() { |
| while (!_stop_background_threads_latch.wait_for( |
| std::chrono::seconds(config::cold_data_compaction_interval_sec))) { |
| if (config::disable_auto_compaction || |
| GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { |
| continue; |
| } |
| |
| std::unordered_set<int64_t> copied_tablet_submitted; |
| { |
| std::lock_guard lock(_cold_compaction_tablet_submitted_mtx); |
| copied_tablet_submitted = _cold_compaction_tablet_submitted; |
| } |
| int64_t n = config::cold_data_compaction_thread_num - copied_tablet_submitted.size(); |
| if (n <= 0) { |
| continue; |
| } |
| auto tablets = _tablet_manager->get_all_tablet([&copied_tablet_submitted](Tablet* t) { |
| return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() && |
| t->tablet_state() == TABLET_RUNNING && |
| !copied_tablet_submitted.contains(t->tablet_id()) && |
| !t->tablet_meta()->tablet_schema()->disable_auto_compaction(); |
| }); |
| std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_compact; |
| tablet_to_compact.reserve(n + 1); |
| std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_follow; |
| tablet_to_follow.reserve(n + 1); |
| |
| for (auto& t : tablets) { |
| if (t->replica_id() == t->cooldown_conf_unlocked().cooldown_replica_id) { |
| auto score = t->calc_cold_data_compaction_score(); |
| if (score < config::cold_data_compaction_score_threshold) { |
| continue; |
| } |
| tablet_to_compact.emplace_back(t, score); |
| if (tablet_to_compact.size() > n) { |
| std::sort(tablet_to_compact.begin(), tablet_to_compact.end(), |
| [](auto& a, auto& b) { return a.second > b.second; }); |
| tablet_to_compact.pop_back(); |
| } |
| continue; |
| } |
| // else, need to follow |
| { |
| std::lock_guard lock(_running_cooldown_mutex); |
| if (_running_cooldown_tablets.contains(t->table_id())) { |
| // already in cooldown queue |
| continue; |
| } |
| } |
| // TODO(plat1ko): some avoidance strategy if failed to follow |
| auto score = t->calc_cold_data_compaction_score(); |
| tablet_to_follow.emplace_back(t, score); |
| |
| if (tablet_to_follow.size() > n) { |
| std::sort(tablet_to_follow.begin(), tablet_to_follow.end(), |
| [](auto& a, auto& b) { return a.second > b.second; }); |
| tablet_to_follow.pop_back(); |
| } |
| } |
| |
| for (auto& [tablet, score] : tablet_to_compact) { |
| LOG(INFO) << "submit cold data compaction. tablet_id=" << tablet->tablet_id() |
| << " score=" << score; |
| static_cast<void>(_cold_data_compaction_thread_pool->submit_func( |
| [&, t = std::move(tablet), this]() { |
| auto compaction = std::make_shared<ColdDataCompaction>(*this, t); |
| { |
| std::lock_guard lock(_cold_compaction_tablet_submitted_mtx); |
| _cold_compaction_tablet_submitted.insert(t->tablet_id()); |
| } |
| Defer defer {[&] { |
| std::lock_guard lock(_cold_compaction_tablet_submitted_mtx); |
| _cold_compaction_tablet_submitted.erase(t->tablet_id()); |
| }}; |
| std::unique_lock cold_compaction_lock(t->get_cold_compaction_lock(), |
| std::try_to_lock); |
| if (!cold_compaction_lock.owns_lock()) { |
| LOG(WARNING) << "try cold_compaction_lock failed, tablet_id=" |
| << t->tablet_id(); |
| return; |
| } |
| _update_cumulative_compaction_policy(); |
| if (t->get_cumulative_compaction_policy() == nullptr || |
| t->get_cumulative_compaction_policy()->name() != |
| t->tablet_meta()->compaction_policy()) { |
| t->set_cumulative_compaction_policy(_cumulative_compaction_policies.at( |
| t->tablet_meta()->compaction_policy())); |
| } |
| |
| auto st = compaction->prepare_compact(); |
| if (!st.ok()) { |
| LOG(WARNING) << "failed to prepare cold data compaction. tablet_id=" |
| << t->tablet_id() << " err=" << st; |
| return; |
| } |
| |
| st = compaction->execute_compact(); |
| if (!st.ok()) { |
| LOG(WARNING) << "failed to execute cold data compaction. tablet_id=" |
| << t->tablet_id() << " err=" << st; |
| return; |
| } |
| })); |
| } |
| |
| for (auto& [tablet, score] : tablet_to_follow) { |
| LOG(INFO) << "submit to follow cooldown meta. tablet_id=" << tablet->tablet_id() |
| << " score=" << score; |
| static_cast<void>(_cold_data_compaction_thread_pool->submit_func([&, |
| t = std::move( |
| tablet)]() { |
| { |
| std::lock_guard lock(_cold_compaction_tablet_submitted_mtx); |
| _cold_compaction_tablet_submitted.insert(t->tablet_id()); |
| } |
| auto st = t->cooldown(); |
| { |
| std::lock_guard lock(_cold_compaction_tablet_submitted_mtx); |
| _cold_compaction_tablet_submitted.erase(t->tablet_id()); |
| } |
| if (!st.ok()) { |
| // The cooldown of the replica may be relatively slow |
| // resulting in a short period of time where following cannot be successful |
| LOG_EVERY_N(WARNING, 5) |
| << "failed to cooldown. tablet_id=" << t->tablet_id() << " err=" << st; |
| } |
| })); |
| } |
| } |
| } |
| |
| void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id, |
| int64_t publish_version, int64_t transaction_id, |
| bool is_recovery) { |
| if (!is_recovery) { |
| bool exists = false; |
| { |
| std::shared_lock<std::shared_mutex> rlock(_async_publish_lock); |
| if (auto tablet_iter = _async_publish_tasks.find(tablet_id); |
| tablet_iter != _async_publish_tasks.end()) { |
| if (auto iter = tablet_iter->second.find(publish_version); |
| iter != tablet_iter->second.end()) { |
| exists = true; |
| } |
| } |
| } |
| if (exists) { |
| return; |
| } |
| TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| LOG(INFO) << "tablet may be dropped when add async publish task, tablet_id: " |
| << tablet_id; |
| return; |
| } |
| PendingPublishInfoPB pending_publish_info_pb; |
| pending_publish_info_pb.set_partition_id(partition_id); |
| pending_publish_info_pb.set_transaction_id(transaction_id); |
| static_cast<void>(TabletMetaManager::save_pending_publish_info( |
| tablet->data_dir(), tablet->tablet_id(), publish_version, |
| pending_publish_info_pb.SerializeAsString())); |
| } |
| LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id |
| << " version: " << publish_version << " txn_id:" << transaction_id |
| << " is_recovery: " << is_recovery; |
| std::unique_lock<std::shared_mutex> wlock(_async_publish_lock); |
| _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id}; |
| } |
| |
| int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) { |
| std::shared_lock<std::shared_mutex> rlock(_async_publish_lock); |
| auto iter = _async_publish_tasks.find(tablet_id); |
| if (iter == _async_publish_tasks.end()) { |
| return INT64_MAX; |
| } |
| if (iter->second.empty()) { |
| return INT64_MAX; |
| } |
| return iter->second.begin()->first; |
| } |
| |
| void StorageEngine::_process_async_publish() { |
| // tablet, publish_version |
| std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks; |
| { |
| std::unique_lock<std::shared_mutex> wlock(_async_publish_lock); |
| for (auto tablet_iter = _async_publish_tasks.begin(); |
| tablet_iter != _async_publish_tasks.end();) { |
| if (tablet_iter->second.empty()) { |
| tablet_iter = _async_publish_tasks.erase(tablet_iter); |
| continue; |
| } |
| int64_t tablet_id = tablet_iter->first; |
| TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id); |
| if (!tablet) { |
| LOG(WARNING) << "tablet does not exist when async publush, tablet_id: " |
| << tablet_id; |
| tablet_iter = _async_publish_tasks.erase(tablet_iter); |
| continue; |
| } |
| |
| auto task_iter = tablet_iter->second.begin(); |
| int64_t version = task_iter->first; |
| int64_t transaction_id = task_iter->second.first; |
| int64_t partition_id = task_iter->second.second; |
| int64_t max_version = tablet->max_version().second; |
| |
| if (version <= max_version) { |
| need_removed_tasks.emplace_back(tablet, version); |
| tablet_iter->second.erase(task_iter); |
| tablet_iter++; |
| continue; |
| } |
| if (version != max_version + 1) { |
| int32_t max_version_config = tablet->max_version_config(); |
| // Keep only the most recent versions |
| while (tablet_iter->second.size() > max_version_config) { |
| need_removed_tasks.emplace_back(tablet, version); |
| task_iter = tablet_iter->second.erase(task_iter); |
| version = task_iter->first; |
| } |
| tablet_iter++; |
| continue; |
| } |
| |
| auto async_publish_task = std::make_shared<AsyncTabletPublishTask>( |
| *this, tablet, partition_id, transaction_id, version); |
| static_cast<void>(_tablet_publish_txn_thread_pool->submit_func( |
| [=]() { async_publish_task->handle(); })); |
| tablet_iter->second.erase(task_iter); |
| need_removed_tasks.emplace_back(tablet, version); |
| tablet_iter++; |
| } |
| } |
| for (auto& [tablet, publish_version] : need_removed_tasks) { |
| static_cast<void>(TabletMetaManager::remove_pending_publish_info( |
| tablet->data_dir(), tablet->tablet_id(), publish_version)); |
| } |
| } |
| |
| void StorageEngine::_async_publish_callback() { |
| while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) { |
| _process_async_publish(); |
| } |
| } |
| |
| void StorageEngine::_check_tablet_delete_bitmap_score_callback() { |
| LOG(INFO) << "try to start check tablet delete bitmap score!"; |
| while (!_stop_background_threads_latch.wait_for( |
| std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) { |
| if (!config::enable_check_tablet_delete_bitmap_score) { |
| return; |
| } |
| uint64_t max_delete_bitmap_score = 0; |
| uint64_t max_base_rowset_delete_bitmap_score = 0; |
| _tablet_manager->get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score, |
| &max_base_rowset_delete_bitmap_score); |
| if (max_delete_bitmap_score > 0) { |
| _tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score); |
| } |
| if (max_base_rowset_delete_bitmap_score > 0) { |
| _tablet_max_base_rowset_delete_bitmap_score_metrics->set_value( |
| max_base_rowset_delete_bitmap_score); |
| } |
| } |
| } |
| #include "common/compile_check_end.h" |
| } // namespace doris |