| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "olap/storage_engine.h" |
| |
| // IWYU pragma: no_include <bthread/errno.h> |
| #include <fmt/format.h> |
| #include <gen_cpp/AgentService_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <glog/logging.h> |
| #include <rapidjson/document.h> |
| #include <rapidjson/encodings.h> |
| #include <rapidjson/prettywriter.h> |
| #include <rapidjson/stringbuffer.h> |
| #include <sys/resource.h> |
| #include <thrift/protocol/TDebugProtocol.h> |
| |
| #include <algorithm> |
| #include <boost/algorithm/string/case_conv.hpp> |
| #include <boost/container/detail/std_fwd.hpp> |
| #include <cassert> |
| #include <cerrno> // IWYU pragma: keep |
| #include <chrono> |
| #include <cstdlib> |
| #include <cstring> |
| #include <filesystem> |
| #include <iterator> |
| #include <memory> |
| #include <mutex> |
| #include <ostream> |
| #include <set> |
| #include <thread> |
| #include <unordered_set> |
| #include <utility> |
| |
| #include "agent/task_worker_pool.h" |
| #include "cloud/cloud_storage_engine.h" |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "gen_cpp/FrontendService.h" |
| #include "io/fs/local_file_system.h" |
| #include "olap/binlog.h" |
| #include "olap/data_dir.h" |
| #include "olap/id_manager.h" |
| #include "olap/memtable_flush_executor.h" |
| #include "olap/olap_common.h" |
| #include "olap/olap_define.h" |
| #include "olap/rowset/rowset_fwd.h" |
| #include "olap/rowset/rowset_meta.h" |
| #include "olap/rowset/rowset_meta_manager.h" |
| #include "olap/rowset/unique_rowset_id_generator.h" |
| #include "olap/schema_cache.h" |
| #include "olap/single_replica_compaction.h" |
| #include "olap/snapshot_manager.h" |
| #include "olap/tablet_manager.h" |
| #include "olap/tablet_meta.h" |
| #include "olap/tablet_meta_manager.h" |
| #include "olap/txn_manager.h" |
| #include "runtime/client_cache.h" |
| #include "runtime/stream_load/stream_load_recorder.h" |
| #include "util/doris_metrics.h" |
| #include "util/mem_info.h" |
| #include "util/metrics.h" |
| #include "util/stopwatch.hpp" |
| #include "util/thread.h" |
| #include "util/threadpool.h" |
| #include "util/thrift_rpc_helper.h" |
| #include "util/uid_util.h" |
| #include "util/work_thread_pool.hpp" |
| #include "vec/common/assert_cast.h" |
| |
| using std::filesystem::directory_iterator; |
| using std::filesystem::path; |
| using std::map; |
| using std::set; |
| using std::string; |
| using std::stringstream; |
| using std::vector; |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| using namespace ErrorCode; |
| extern void get_round_robin_stores(int64_t curr_index, const std::vector<DirInfo>& dir_infos, |
| std::vector<DataDir*>& stores); |
| DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS); |
| bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap( |
| "max_rowsets_with_useless_delete_bitmap", 0); |
| bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap_version( |
| "max_rowsets_with_useless_delete_bitmap_version", 0); |
| |
| namespace { |
| bvar::Adder<uint64_t> unused_rowsets_counter("ununsed_rowsets_counter"); |
| }; |
| |
| BaseStorageEngine::BaseStorageEngine(Type type, const UniqueId& backend_uid) |
| : _type(type), |
| _rowset_id_generator(std::make_unique<UniqueRowsetIdGenerator>(backend_uid)), |
| _stop_background_threads_latch(1) { |
| _memory_limitation_bytes_for_schema_change = static_cast<int64_t>( |
| static_cast<double>(MemInfo::soft_mem_limit()) * config::schema_change_mem_limit_frac); |
| _tablet_max_delete_bitmap_score_metrics = |
| std::make_shared<bvar::Status<size_t>>("tablet_max", "delete_bitmap_score", 0); |
| _tablet_max_base_rowset_delete_bitmap_score_metrics = std::make_shared<bvar::Status<size_t>>( |
| "tablet_max_base_rowset", "delete_bitmap_score", 0); |
| } |
| |
| BaseStorageEngine::~BaseStorageEngine() = default; |
| |
| RowsetId BaseStorageEngine::next_rowset_id() { |
| return _rowset_id_generator->next_id(); |
| } |
| |
| StorageEngine& BaseStorageEngine::to_local() { |
| CHECK_EQ(_type, Type::LOCAL); |
| return *static_cast<StorageEngine*>(this); |
| } |
| |
| CloudStorageEngine& BaseStorageEngine::to_cloud() { |
| CHECK_EQ(_type, Type::CLOUD); |
| return *static_cast<CloudStorageEngine*>(this); |
| } |
| |
| int64_t BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change() const { |
| return std::max(_memory_limitation_bytes_for_schema_change / config::alter_tablet_worker_count, |
| config::memory_limitation_per_thread_for_schema_change_bytes); |
| } |
| |
| Status BaseStorageEngine::init_stream_load_recorder(const std::string& stream_load_record_path) { |
| LOG(INFO) << "stream load record path: " << stream_load_record_path; |
| // init stream load record rocksdb |
| _stream_load_recorder = StreamLoadRecorder::create_shared(stream_load_record_path); |
| if (_stream_load_recorder == nullptr) { |
| RETURN_NOT_OK_STATUS_WITH_WARN( |
| Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"), |
| "new StreamLoadRecorder failed"); |
| } |
| auto st = _stream_load_recorder->init(); |
| if (!st.ok()) { |
| RETURN_NOT_OK_STATUS_WITH_WARN( |
| Status::IOError("open StreamLoadRecorder rocksdb failed, path={}", |
| stream_load_record_path), |
| "init StreamLoadRecorder failed"); |
| } |
| return Status::OK(); |
| } |
| |
| void CompactionSubmitRegistry::jsonfy_compaction_status(std::string* result) { |
| rapidjson::Document root; |
| root.SetObject(); |
| |
| auto add_node = [&root](const std::string& name, const Registry& registry) { |
| rapidjson::Value key; |
| key.SetString(name.c_str(), cast_set<uint32_t>(name.length()), root.GetAllocator()); |
| rapidjson::Document path_obj; |
| path_obj.SetObject(); |
| for (const auto& it : registry) { |
| const auto& dir = it.first->path(); |
| rapidjson::Value path_key; |
| path_key.SetString(dir.c_str(), cast_set<uint32_t>(dir.length()), root.GetAllocator()); |
| |
| rapidjson::Document arr; |
| arr.SetArray(); |
| |
| for (const auto& tablet : it.second) { |
| rapidjson::Value temp_key; |
| auto key_str = std::to_string(tablet->tablet_id()); |
| temp_key.SetString(key_str.c_str(), cast_set<uint32_t>(key_str.length()), |
| root.GetAllocator()); |
| arr.PushBack(key, root.GetAllocator()); |
| } |
| path_obj.AddMember(path_key, arr, root.GetAllocator()); |
| } |
| root.AddMember(key, path_obj, root.GetAllocator()); |
| }; |
| |
| std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex); |
| add_node("BaseCompaction", _tablet_submitted_base_compaction); |
| add_node("CumulativeCompaction", _tablet_submitted_cumu_compaction); |
| add_node("FullCompaction", _tablet_submitted_full_compaction); |
| |
| rapidjson::StringBuffer str_buf; |
| rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(str_buf); |
| root.Accept(writer); |
| *result = std::string(str_buf.GetString()); |
| } |
| |
| static Status _validate_options(const EngineOptions& options) { |
| if (options.store_paths.empty()) { |
| return Status::InternalError("store paths is empty"); |
| } |
| return Status::OK(); |
| } |
| |
| Status StorageEngine::open() { |
| RETURN_IF_ERROR(_validate_options(_options)); |
| LOG(INFO) << "starting backend using uid:" << _options.backend_uid.to_string(); |
| RETURN_NOT_OK_STATUS_WITH_WARN(_open(), "open engine failed"); |
| LOG(INFO) << "success to init storage engine."; |
| return Status::OK(); |
| } |
| |
| StorageEngine::StorageEngine(const EngineOptions& options) |
| : BaseStorageEngine(Type::LOCAL, options.backend_uid), |
| _options(options), |
| _available_storage_medium_type_count(0), |
| _is_all_cluster_id_exist(true), |
| _stopped(false), |
| _tablet_manager(new TabletManager(*this, config::tablet_map_shard_size)), |
| _txn_manager(new TxnManager(*this, config::txn_map_shard_size, config::txn_shard_size)), |
| _default_rowset_type(BETA_ROWSET), |
| _create_tablet_idx_lru_cache( |
| new CreateTabletRRIdxCache(config::partition_disk_index_lru_size)), |
| _snapshot_mgr(std::make_unique<SnapshotManager>(*this)) { |
| REGISTER_HOOK_METRIC(unused_rowsets_count, [this]() { |
| // std::lock_guard<std::mutex> lock(_gc_mutex); |
| return _unused_rowsets.size(); |
| }); |
| |
| _broken_paths = options.broken_paths; |
| } |
| |
| StorageEngine::~StorageEngine() { |
| stop(); |
| } |
| |
| static Status load_data_dirs(const std::vector<DataDir*>& data_dirs) { |
| std::unique_ptr<ThreadPool> pool; |
| |
| int num_threads = config::load_data_dirs_threads; |
| if (num_threads <= 0) { |
| num_threads = cast_set<int>(data_dirs.size()); |
| } |
| |
| auto st = ThreadPoolBuilder("load_data_dir") |
| .set_min_threads(num_threads) |
| .set_max_threads(num_threads) |
| .build(&pool); |
| CHECK(st.ok()) << st; |
| |
| std::mutex result_mtx; |
| Status result; |
| |
| for (auto* data_dir : data_dirs) { |
| st = pool->submit_func([&, data_dir] { |
| SCOPED_INIT_THREAD_CONTEXT(); |
| { |
| std::lock_guard lock(result_mtx); |
| if (!result.ok()) { // Some data dir has failed |
| return; |
| } |
| } |
| |
| auto st = data_dir->load(); |
| if (!st.ok()) { |
| LOG(WARNING) << "error occured when init load tables. res=" << st |
| << ", data dir=" << data_dir->path(); |
| std::lock_guard lock(result_mtx); |
| result = std::move(st); |
| } |
| }); |
| |
| if (!st.ok()) { |
| return st; |
| } |
| } |
| |
| pool->wait(); |
| |
| return result; |
| } |
| |
| Status StorageEngine::_open() { |
| // init store_map |
| RETURN_NOT_OK_STATUS_WITH_WARN(_init_store_map(), "_init_store_map failed"); |
| |
| _effective_cluster_id = config::cluster_id; |
| RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id"); |
| |
| _update_storage_medium_type_count(); |
| |
| RETURN_NOT_OK_STATUS_WITH_WARN(_check_file_descriptor_number(), "check fd number failed"); |
| |
| auto dirs = get_stores(); |
| RETURN_IF_ERROR(load_data_dirs(dirs)); |
| |
| _disk_num = cast_set<int>(dirs.size()); |
| _memtable_flush_executor = std::make_unique<MemTableFlushExecutor>(); |
| _memtable_flush_executor->init(_disk_num); |
| |
| _calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>(); |
| _calc_delete_bitmap_executor->init(config::calc_delete_bitmap_max_thread); |
| |
| _calc_delete_bitmap_executor_for_load = std::make_unique<CalcDeleteBitmapExecutor>(); |
| _calc_delete_bitmap_executor_for_load->init( |
| config::calc_delete_bitmap_for_load_max_thread > 0 |
| ? config::calc_delete_bitmap_for_load_max_thread |
| : std::max(1, CpuInfo::num_cores() / 2)); |
| |
| _parse_default_rowset_type(); |
| |
| return Status::OK(); |
| } |
| |
| Status StorageEngine::_init_store_map() { |
| std::vector<std::thread> threads; |
| std::mutex error_msg_lock; |
| std::string error_msg; |
| for (auto& path : _options.store_paths) { |
| auto store = std::make_unique<DataDir>(*this, path.path, path.capacity_bytes, |
| path.storage_medium); |
| threads.emplace_back([store = store.get(), &error_msg_lock, &error_msg]() { |
| SCOPED_INIT_THREAD_CONTEXT(); |
| auto st = store->init(); |
| if (!st.ok()) { |
| { |
| std::lock_guard<std::mutex> l(error_msg_lock); |
| error_msg.append(st.to_string() + ";"); |
| } |
| LOG(WARNING) << "Store load failed, status=" << st.to_string() |
| << ", path=" << store->path(); |
| } |
| }); |
| _store_map.emplace(store->path(), std::move(store)); |
| } |
| for (auto& thread : threads) { |
| thread.join(); |
| } |
| |
| // All store paths MUST init successfully |
| if (!error_msg.empty()) { |
| return Status::InternalError("init path failed, error={}", error_msg); |
| } |
| |
| RETURN_NOT_OK_STATUS_WITH_WARN(init_stream_load_recorder(_options.store_paths[0].path), |
| "init StreamLoadRecorder failed"); |
| |
| return Status::OK(); |
| } |
| |
| void StorageEngine::_update_storage_medium_type_count() { |
| set<TStorageMedium::type> available_storage_medium_types; |
| |
| std::lock_guard<std::mutex> l(_store_lock); |
| for (auto& it : _store_map) { |
| if (it.second->is_used()) { |
| available_storage_medium_types.insert(it.second->storage_medium()); |
| } |
| } |
| |
| _available_storage_medium_type_count = |
| cast_set<uint32_t>(available_storage_medium_types.size()); |
| } |
| |
| Status StorageEngine::_judge_and_update_effective_cluster_id(int32_t cluster_id) { |
| if (cluster_id == -1 && _effective_cluster_id == -1) { |
| // maybe this is a new cluster, cluster id will get from heartbeat message |
| return Status::OK(); |
| } else if (cluster_id != -1 && _effective_cluster_id == -1) { |
| _effective_cluster_id = cluster_id; |
| return Status::OK(); |
| } else if (cluster_id == -1 && _effective_cluster_id != -1) { |
| // _effective_cluster_id is the right effective cluster id |
| return Status::OK(); |
| } else { |
| if (cluster_id != _effective_cluster_id) { |
| RETURN_NOT_OK_STATUS_WITH_WARN( |
| Status::Corruption("multiple cluster ids is not equal. one={}, other={}", |
| _effective_cluster_id, cluster_id), |
| "cluster id not equal"); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| std::vector<DataDir*> StorageEngine::get_stores(bool include_unused) { |
| std::vector<DataDir*> stores; |
| stores.reserve(_store_map.size()); |
| |
| std::lock_guard<std::mutex> l(_store_lock); |
| if (include_unused) { |
| for (auto&& [_, store] : _store_map) { |
| stores.push_back(store.get()); |
| } |
| } else { |
| for (auto&& [_, store] : _store_map) { |
| if (store->is_used()) { |
| stores.push_back(store.get()); |
| } |
| } |
| } |
| return stores; |
| } |
| |
| Status StorageEngine::get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos, |
| bool need_update) { |
| Status res = Status::OK(); |
| data_dir_infos->clear(); |
| |
| MonotonicStopWatch timer; |
| timer.start(); |
| |
| // 1. update available capacity of each data dir |
| // get all root path info and construct a path map. |
| // path -> DataDirInfo |
| std::map<std::string, DataDirInfo> path_map; |
| { |
| std::lock_guard<std::mutex> l(_store_lock); |
| for (auto& it : _store_map) { |
| if (need_update) { |
| RETURN_IF_ERROR(it.second->update_capacity()); |
| } |
| path_map.emplace(it.first, it.second->get_dir_info()); |
| } |
| } |
| |
| // 2. get total tablets' size of each data dir |
| size_t tablet_count = 0; |
| _tablet_manager->update_root_path_info(&path_map, &tablet_count); |
| |
| // 3. update metrics in DataDir |
| for (auto& path : path_map) { |
| std::lock_guard<std::mutex> l(_store_lock); |
| auto data_dir = _store_map.find(path.first); |
| DCHECK(data_dir != _store_map.end()); |
| data_dir->second->update_local_data_size(path.second.local_used_capacity); |
| data_dir->second->update_remote_data_size(path.second.remote_used_capacity); |
| } |
| |
| // add path info to data_dir_infos |
| for (auto& entry : path_map) { |
| data_dir_infos->emplace_back(entry.second); |
| } |
| |
| timer.stop(); |
| LOG(INFO) << "get root path info cost: " << timer.elapsed_time() / 1000000 |
| << " ms. tablet counter: " << tablet_count; |
| |
| return res; |
| } |
| |
| int64_t StorageEngine::get_file_or_directory_size(const std::string& file_path) { |
| if (!std::filesystem::exists(file_path)) { |
| return 0; |
| } |
| if (!std::filesystem::is_directory(file_path)) { |
| return std::filesystem::file_size(file_path); |
| } |
| int64_t sum_size = 0; |
| for (const auto& it : std::filesystem::directory_iterator(file_path)) { |
| sum_size += get_file_or_directory_size(it.path()); |
| } |
| return sum_size; |
| } |
| |
| void StorageEngine::_start_disk_stat_monitor() { |
| for (auto& it : _store_map) { |
| it.second->health_check(); |
| } |
| |
| _update_storage_medium_type_count(); |
| |
| _exit_if_too_many_disks_are_failed(); |
| } |
| |
| // TODO(lingbin): Should be in EnvPosix? |
| Status StorageEngine::_check_file_descriptor_number() { |
| struct rlimit l; |
| int ret = getrlimit(RLIMIT_NOFILE, &l); |
| if (ret != 0) { |
| LOG(WARNING) << "call getrlimit() failed. errno=" << strerror(errno) |
| << ", use default configuration instead."; |
| return Status::OK(); |
| } |
| if (getenv("SKIP_CHECK_ULIMIT") == nullptr) { |
| LOG(INFO) << "will check 'ulimit' value."; |
| } else if (std::string(getenv("SKIP_CHECK_ULIMIT")) == "true") { |
| LOG(INFO) << "the 'ulimit' value check is skipped" |
| << ", the SKIP_CHECK_ULIMIT env value is " << getenv("SKIP_CHECK_ULIMIT"); |
| return Status::OK(); |
| } else { |
| LOG(INFO) << "the SKIP_CHECK_ULIMIT env value is " << getenv("SKIP_CHECK_ULIMIT") |
| << ", will check ulimit value."; |
| } |
| if (l.rlim_cur < config::min_file_descriptor_number) { |
| LOG(ERROR) << "File descriptor number is less than " << config::min_file_descriptor_number |
| << ". Please use (ulimit -n) to set a value equal or greater than " |
| << config::min_file_descriptor_number; |
| return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
| "file descriptors limit {} is small than {}", l.rlim_cur, |
| config::min_file_descriptor_number); |
| } |
| return Status::OK(); |
| } |
| |
| Status StorageEngine::_check_all_root_path_cluster_id() { |
| int32_t cluster_id = -1; |
| for (auto& it : _store_map) { |
| int32_t tmp_cluster_id = it.second->cluster_id(); |
| if (it.second->cluster_id_incomplete()) { |
| _is_all_cluster_id_exist = false; |
| } else if (tmp_cluster_id == cluster_id) { |
| // both have right cluster id, do nothing |
| } else if (cluster_id == -1) { |
| cluster_id = tmp_cluster_id; |
| } else { |
| RETURN_NOT_OK_STATUS_WITH_WARN( |
| Status::Corruption("multiple cluster ids is not equal. one={}, other={}", |
| cluster_id, tmp_cluster_id), |
| "cluster id not equal"); |
| } |
| } |
| |
| // judge and get effective cluster id |
| RETURN_IF_ERROR(_judge_and_update_effective_cluster_id(cluster_id)); |
| |
| // write cluster id into cluster_id_path if get effective cluster id success |
| if (_effective_cluster_id != -1 && !_is_all_cluster_id_exist) { |
| RETURN_IF_ERROR(set_cluster_id(_effective_cluster_id)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status StorageEngine::set_cluster_id(int32_t cluster_id) { |
| std::lock_guard<std::mutex> l(_store_lock); |
| for (auto& it : _store_map) { |
| RETURN_IF_ERROR(it.second->set_cluster_id(cluster_id)); |
| } |
| _effective_cluster_id = cluster_id; |
| _is_all_cluster_id_exist = true; |
| return Status::OK(); |
| } |
| |
| int StorageEngine::_get_and_set_next_disk_index(int64_t partition_id, |
| TStorageMedium::type storage_medium) { |
| auto key = CreateTabletRRIdxCache::get_key(partition_id, storage_medium); |
| int curr_index = _create_tablet_idx_lru_cache->get_index(key); |
| // -1, lru can't find key |
| if (curr_index == -1) { |
| curr_index = std::max(0, _last_use_index[storage_medium] + 1); |
| } |
| _last_use_index[storage_medium] = curr_index; |
| _create_tablet_idx_lru_cache->set_index(key, std::max(0, curr_index + 1)); |
| return curr_index; |
| } |
| |
| void StorageEngine::_get_candidate_stores(TStorageMedium::type storage_medium, |
| std::vector<DirInfo>& dir_infos) { |
| std::vector<double> usages; |
| for (auto& it : _store_map) { |
| DataDir* data_dir = it.second.get(); |
| if (data_dir->is_used()) { |
| if ((_available_storage_medium_type_count == 1 || |
| data_dir->storage_medium() == storage_medium) && |
| !data_dir->reach_capacity_limit(0)) { |
| double usage = data_dir->get_usage(0); |
| DirInfo dir_info; |
| dir_info.data_dir = data_dir; |
| dir_info.usage = usage; |
| dir_info.available_level = 0; |
| usages.push_back(usage); |
| dir_infos.push_back(dir_info); |
| } |
| } |
| } |
| |
| if (dir_infos.size() <= 1) { |
| return; |
| } |
| |
| std::sort(usages.begin(), usages.end()); |
| if (usages.back() < 0.7) { |
| return; |
| } |
| |
| std::vector<double> level_min_usages; |
| level_min_usages.push_back(usages[0]); |
| for (auto usage : usages) { |
| // usage < 0.7 consider as one level, give a small skew |
| if (usage < 0.7 - (config::high_disk_avail_level_diff_usages / 2.0)) { |
| continue; |
| } |
| |
| // at high usages, default 15% is one level |
| // for example: there disk usages are: 0.66, 0.72, 0.83 |
| // then level_min_usages = [0.66, 0.83], divide disks into 2 levels: [0.66, 0.72], [0.83] |
| if (usage >= level_min_usages.back() + config::high_disk_avail_level_diff_usages) { |
| level_min_usages.push_back(usage); |
| } |
| } |
| for (auto& dir_info : dir_infos) { |
| double usage = dir_info.usage; |
| for (size_t i = 1; i < level_min_usages.size() && usage >= level_min_usages[i]; i++) { |
| dir_info.available_level++; |
| } |
| |
| // when usage is too high, no matter consider balance now, |
| // make it a higher level. |
| // for example, two disks and usages are: 0.85 and 0.92, then let tablets fall on the first disk. |
| // by default, storage_flood_stage_usage_percent = 90 |
| if (usage > config::storage_flood_stage_usage_percent / 100.0) { |
| dir_info.available_level++; |
| } |
| } |
| } |
| |
| std::vector<DataDir*> StorageEngine::get_stores_for_create_tablet( |
| int64_t partition_id, TStorageMedium::type storage_medium) { |
| std::vector<DirInfo> dir_infos; |
| int curr_index = 0; |
| std::vector<DataDir*> stores; |
| { |
| std::lock_guard<std::mutex> l(_store_lock); |
| curr_index = _get_and_set_next_disk_index(partition_id, storage_medium); |
| _get_candidate_stores(storage_medium, dir_infos); |
| } |
| |
| std::sort(dir_infos.begin(), dir_infos.end()); |
| get_round_robin_stores(curr_index, dir_infos, stores); |
| |
| return stores; |
| } |
| |
| // maintain in stores LOW,MID,HIGH level round robin |
| void get_round_robin_stores(int64_t curr_index, const std::vector<DirInfo>& dir_infos, |
| std::vector<DataDir*>& stores) { |
| for (size_t i = 0; i < dir_infos.size();) { |
| size_t end = i + 1; |
| while (end < dir_infos.size() && |
| dir_infos[i].available_level == dir_infos[end].available_level) { |
| end++; |
| } |
| // data dirs [i, end) have the same tablet size, round robin range [i, end) |
| size_t count = end - i; |
| for (size_t k = 0; k < count; k++) { |
| size_t index = i + ((k + curr_index) % count); |
| stores.push_back(dir_infos[index].data_dir); |
| } |
| i = end; |
| } |
| } |
| |
| DataDir* StorageEngine::get_store(const std::string& path) { |
| // _store_map is unchanged, no need to lock |
| auto it = _store_map.find(path); |
| if (it == _store_map.end()) { |
| return nullptr; |
| } |
| return it->second.get(); |
| } |
| |
| static bool too_many_disks_are_failed(uint32_t unused_num, uint32_t total_num) { |
| return ((total_num == 0) || |
| (unused_num * 100 / total_num > config::max_percentage_of_error_disk)); |
| } |
| |
| void StorageEngine::_exit_if_too_many_disks_are_failed() { |
| uint32_t unused_root_path_num = 0; |
| uint32_t total_root_path_num = 0; |
| |
| { |
| // TODO(yingchun): _store_map is only updated in main and ~StorageEngine, maybe we can remove it? |
| std::lock_guard<std::mutex> l(_store_lock); |
| if (_store_map.empty()) { |
| return; |
| } |
| |
| for (auto& it : _store_map) { |
| ++total_root_path_num; |
| if (it.second->is_used()) { |
| continue; |
| } |
| ++unused_root_path_num; |
| } |
| } |
| |
| if (too_many_disks_are_failed(unused_root_path_num, total_root_path_num)) { |
| LOG(FATAL) << "meet too many error disks, process exit. " |
| << "max_ratio_allowed=" << config::max_percentage_of_error_disk << "%" |
| << ", error_disk_count=" << unused_root_path_num |
| << ", total_disk_count=" << total_root_path_num; |
| exit(0); |
| } |
| } |
| |
| void StorageEngine::stop() { |
| if (_stopped) { |
| LOG(WARNING) << "Storage engine is stopped twice."; |
| return; |
| } |
| // trigger the waiting threads |
| notify_listeners(); |
| |
| { |
| std::lock_guard<std::mutex> l(_store_lock); |
| for (auto& store_pair : _store_map) { |
| store_pair.second->stop_bg_worker(); |
| } |
| } |
| |
| _stop_background_threads_latch.count_down(); |
| #define THREAD_JOIN(thread) \ |
| if (thread) { \ |
| thread->join(); \ |
| } |
| |
| THREAD_JOIN(_compaction_tasks_producer_thread); |
| THREAD_JOIN(_update_replica_infos_thread); |
| THREAD_JOIN(_unused_rowset_monitor_thread); |
| THREAD_JOIN(_garbage_sweeper_thread); |
| THREAD_JOIN(_disk_stat_monitor_thread); |
| THREAD_JOIN(_cache_clean_thread); |
| THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread); |
| THREAD_JOIN(_async_publish_thread); |
| THREAD_JOIN(_cold_data_compaction_producer_thread); |
| THREAD_JOIN(_cooldown_tasks_producer_thread); |
| THREAD_JOIN(_check_delete_bitmap_score_thread); |
| #undef THREAD_JOIN |
| |
| #define THREADS_JOIN(threads) \ |
| for (const auto& thread : threads) { \ |
| if (thread) { \ |
| thread->join(); \ |
| } \ |
| } |
| |
| THREADS_JOIN(_path_gc_threads); |
| #undef THREADS_JOIN |
| |
| if (_base_compaction_thread_pool) { |
| _base_compaction_thread_pool->shutdown(); |
| } |
| if (_cumu_compaction_thread_pool) { |
| _cumu_compaction_thread_pool->shutdown(); |
| } |
| if (_single_replica_compaction_thread_pool) { |
| _single_replica_compaction_thread_pool->shutdown(); |
| } |
| |
| if (_seg_compaction_thread_pool) { |
| _seg_compaction_thread_pool->shutdown(); |
| } |
| if (_tablet_meta_checkpoint_thread_pool) { |
| _tablet_meta_checkpoint_thread_pool->shutdown(); |
| } |
| if (_cold_data_compaction_thread_pool) { |
| _cold_data_compaction_thread_pool->shutdown(); |
| } |
| |
| if (_cooldown_thread_pool) { |
| _cooldown_thread_pool->shutdown(); |
| } |
| |
| _memtable_flush_executor.reset(nullptr); |
| _calc_delete_bitmap_executor.reset(nullptr); |
| _calc_delete_bitmap_executor_for_load.reset(); |
| |
| _stopped = true; |
| LOG(INFO) << "Storage engine is stopped."; |
| } |
| |
| void StorageEngine::clear_transaction_task(const TTransactionId transaction_id) { |
| // clear transaction task may not contains partitions ids, we should get partition id from txn manager. |
| std::vector<int64_t> partition_ids; |
| _txn_manager->get_partition_ids(transaction_id, &partition_ids); |
| clear_transaction_task(transaction_id, partition_ids); |
| } |
| |
| void StorageEngine::clear_transaction_task(const TTransactionId transaction_id, |
| const std::vector<TPartitionId>& partition_ids) { |
| LOG(INFO) << "begin to clear transaction task. transaction_id=" << transaction_id; |
| |
| for (const TPartitionId& partition_id : partition_ids) { |
| std::map<TabletInfo, RowsetSharedPtr> tablet_infos; |
| _txn_manager->get_txn_related_tablets(transaction_id, partition_id, &tablet_infos); |
| |
| // each tablet |
| for (auto& tablet_info : tablet_infos) { |
| // should use tablet uid to ensure clean txn correctly |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_info.first.tablet_id, |
| tablet_info.first.tablet_uid); |
| // The tablet may be dropped or altered, leave a INFO log and go on process other tablet |
| if (tablet == nullptr) { |
| LOG(INFO) << "tablet is no longer exist. tablet_id=" << tablet_info.first.tablet_id |
| << ", tablet_uid=" << tablet_info.first.tablet_uid; |
| continue; |
| } |
| Status s = _txn_manager->delete_txn(partition_id, tablet, transaction_id); |
| if (!s.ok()) { |
| LOG(WARNING) << "failed to clear transaction. txn_id=" << transaction_id |
| << ", partition_id=" << partition_id |
| << ", tablet_id=" << tablet_info.first.tablet_id |
| << ", status=" << s.to_string(); |
| } |
| } |
| } |
| LOG(INFO) << "finish to clear transaction task. transaction_id=" << transaction_id; |
| } |
| |
| Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) { |
| Status res = Status::OK(); |
| |
| std::unique_lock<std::mutex> l(_trash_sweep_lock, std::defer_lock); |
| if (!l.try_lock()) { |
| LOG(INFO) << "trash and snapshot sweep is running."; |
| if (ignore_guard) { |
| _need_clean_trash.store(true, std::memory_order_relaxed); |
| } |
| return res; |
| } |
| |
| LOG(INFO) << "start trash and snapshot sweep. is_clean=" << ignore_guard; |
| |
| const int32_t snapshot_expire = config::snapshot_expire_time_sec; |
| const int32_t trash_expire = config::trash_file_expire_time_sec; |
| // the guard space should be lower than storage_flood_stage_usage_percent, |
| // so here we multiply 0.9 |
| // if ignore_guard is true, set guard_space to 0. |
| const double guard_space = |
| ignore_guard ? 0 : config::storage_flood_stage_usage_percent / 100.0 * 0.9; |
| std::vector<DataDirInfo> data_dir_infos; |
| RETURN_NOT_OK_STATUS_WITH_WARN(get_all_data_dir_info(&data_dir_infos, false), |
| "failed to get root path stat info when sweep trash.") |
| std::sort(data_dir_infos.begin(), data_dir_infos.end(), DataDirInfoLessAvailability()); |
| |
| time_t now = time(nullptr); //获取UTC时间 |
| tm local_tm_now; |
| local_tm_now.tm_isdst = 0; |
| if (localtime_r(&now, &local_tm_now) == nullptr) { |
| return Status::Error<OS_ERROR>("fail to localtime_r time. time={}", now); |
| } |
| const time_t local_now = mktime(&local_tm_now); //得到当地日历时间 |
| |
| double tmp_usage = 0.0; |
| for (DataDirInfo& info : data_dir_infos) { |
| LOG(INFO) << "Start to sweep path " << info.path; |
| if (!info.is_used) { |
| continue; |
| } |
| |
| double curr_usage = |
| (double)(info.disk_capacity - info.available) / (double)info.disk_capacity; |
| tmp_usage = std::max(tmp_usage, curr_usage); |
| |
| Status curr_res = Status::OK(); |
| auto snapshot_path = fmt::format("{}/{}", info.path, SNAPSHOT_PREFIX); |
| curr_res = _do_sweep(snapshot_path, local_now, snapshot_expire); |
| if (!curr_res.ok()) { |
| LOG(WARNING) << "failed to sweep snapshot. path=" << snapshot_path |
| << ", err_code=" << curr_res; |
| res = curr_res; |
| } |
| |
| auto trash_path = fmt::format("{}/{}", info.path, TRASH_PREFIX); |
| curr_res = _do_sweep(trash_path, local_now, curr_usage > guard_space ? 0 : trash_expire); |
| if (!curr_res.ok()) { |
| LOG(WARNING) << "failed to sweep trash. path=" << trash_path |
| << ", err_code=" << curr_res; |
| res = curr_res; |
| } |
| } |
| |
| if (usage != nullptr) { |
| *usage = tmp_usage; // update usage |
| } |
| |
| // clear expire incremental rowset, move deleted tablet to trash |
| RETURN_IF_ERROR(_tablet_manager->start_trash_sweep()); |
| |
| // clean rubbish transactions |
| _clean_unused_txns(); |
| |
| // clean unused rowset metas in OlapMeta |
| _clean_unused_rowset_metas(); |
| |
| // clean unused binlog metas in OlapMeta |
| _clean_unused_binlog_metas(); |
| |
| // cleand unused delete bitmap for deleted tablet |
| _clean_unused_delete_bitmap(); |
| |
| // cleand unused pending publish info for deleted tablet |
| _clean_unused_pending_publish_info(); |
| |
| // clean unused partial update info for finished txns |
| _clean_unused_partial_update_info(); |
| |
| // clean unused rowsets in remote storage backends |
| for (auto data_dir : get_stores()) { |
| data_dir->perform_remote_rowset_gc(); |
| data_dir->perform_remote_tablet_gc(); |
| data_dir->update_trash_capacity(); |
| } |
| |
| return res; |
| } |
| |
| void StorageEngine::_clean_unused_rowset_metas() { |
| std::vector<RowsetMetaSharedPtr> invalid_rowset_metas; |
| auto clean_rowset_func = [this, &invalid_rowset_metas](TabletUid tablet_uid, RowsetId rowset_id, |
| std::string_view meta_str) -> bool { |
| // return false will break meta iterator, return true to skip this error |
| RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); |
| bool parsed = rowset_meta->init(meta_str); |
| if (!parsed) { |
| LOG(WARNING) << "parse rowset meta string failed for rowset_id:" << rowset_id; |
| invalid_rowset_metas.push_back(rowset_meta); |
| return true; |
| } |
| if (rowset_meta->tablet_uid() != tablet_uid) { |
| LOG(WARNING) << "tablet uid is not equal, skip the rowset" |
| << ", rowset_id=" << rowset_meta->rowset_id() |
| << ", in_put_tablet_uid=" << tablet_uid |
| << ", tablet_uid in rowset meta=" << rowset_meta->tablet_uid(); |
| invalid_rowset_metas.push_back(rowset_meta); |
| return true; |
| } |
| |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id()); |
| if (tablet == nullptr) { |
| // tablet may be dropped |
| // TODO(cmy): this is better to be a VLOG, because drop table is a very common case. |
| // leave it as INFO log for observation. Maybe change it in future. |
| LOG(INFO) << "failed to find tablet " << rowset_meta->tablet_id() |
| << " for rowset: " << rowset_meta->rowset_id() << ", tablet may be dropped"; |
| invalid_rowset_metas.push_back(rowset_meta); |
| return true; |
| } |
| if (tablet->tablet_uid() != rowset_meta->tablet_uid()) { |
| // In this case, we get the tablet using the tablet id recorded in the rowset meta. |
| // but the uid in the tablet is different from the one recorded in the rowset meta. |
| // How this happened: |
| // Replica1 of Tablet A exists on BE1. Because of the clone task, a new replica2 is createed on BE2, |
| // and then replica1 deleted from BE1. After some time, we created replica again on BE1, |
| // which will creates a new tablet with the same id but a different uid. |
| // And in the historical version, when we deleted the replica, we did not delete the corresponding rowset meta, |
| // thus causing the original rowset meta to remain(with same tablet id but different uid). |
| LOG(WARNING) << "rowset's tablet uid " << rowset_meta->tablet_uid() |
| << " does not equal to tablet uid: " << tablet->tablet_uid(); |
| invalid_rowset_metas.push_back(rowset_meta); |
| return true; |
| } |
| if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE && |
| (!tablet->rowset_meta_is_useful(rowset_meta))) { |
| LOG(INFO) << "rowset meta is not used any more, remove it. rowset_id=" |
| << rowset_meta->rowset_id(); |
| invalid_rowset_metas.push_back(rowset_meta); |
| } |
| return true; |
| }; |
| auto data_dirs = get_stores(); |
| for (auto data_dir : data_dirs) { |
| static_cast<void>( |
| RowsetMetaManager::traverse_rowset_metas(data_dir->get_meta(), clean_rowset_func)); |
| // 1. delete delete_bitmap |
| std::set<int64_t> tablets_to_save_meta; |
| for (auto& rowset_meta : invalid_rowset_metas) { |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id()); |
| if (tablet && tablet->tablet_meta()->enable_unique_key_merge_on_write()) { |
| tablet->tablet_meta()->remove_rowset_delete_bitmap(rowset_meta->rowset_id(), |
| rowset_meta->version()); |
| tablets_to_save_meta.emplace(tablet->tablet_id()); |
| } |
| } |
| for (const auto& tablet_id : tablets_to_save_meta) { |
| auto tablet = _tablet_manager->get_tablet(tablet_id); |
| if (tablet) { |
| std::shared_lock rlock(tablet->get_header_lock()); |
| tablet->save_meta(); |
| } |
| } |
| // 2. delete rowset meta |
| for (auto& rowset_meta : invalid_rowset_metas) { |
| static_cast<void>(RowsetMetaManager::remove( |
| data_dir->get_meta(), rowset_meta->tablet_uid(), rowset_meta->rowset_id())); |
| } |
| LOG(INFO) << "remove " << invalid_rowset_metas.size() |
| << " invalid rowset meta from dir: " << data_dir->path(); |
| invalid_rowset_metas.clear(); |
| } |
| } |
| |
| void StorageEngine::_clean_unused_binlog_metas() { |
| std::vector<std::string> unused_binlog_key_suffixes; |
| auto unused_binlog_collector = [this, &unused_binlog_key_suffixes](std::string_view key, |
| std::string_view value, |
| bool need_check) -> bool { |
| if (need_check) { |
| BinlogMetaEntryPB binlog_meta_pb; |
| if (UNLIKELY(!binlog_meta_pb.ParseFromArray(value.data(), |
| cast_set<int>(value.size())))) { |
| LOG(WARNING) << "parse rowset meta string failed for binlog meta key: " << key; |
| } else if (_tablet_manager->get_tablet(binlog_meta_pb.tablet_id()) == nullptr) { |
| LOG(INFO) << "failed to find tablet " << binlog_meta_pb.tablet_id() |
| << " for binlog rowset: " << binlog_meta_pb.rowset_id() |
| << ", tablet may be dropped"; |
| } else { |
| return false; |
| } |
| } |
| |
| unused_binlog_key_suffixes.emplace_back(key.substr(kBinlogMetaPrefix.size())); |
| return true; |
| }; |
| auto data_dirs = get_stores(); |
| for (auto data_dir : data_dirs) { |
| static_cast<void>(RowsetMetaManager::traverse_binlog_metas(data_dir->get_meta(), |
| unused_binlog_collector)); |
| for (const auto& suffix : unused_binlog_key_suffixes) { |
| static_cast<void>(RowsetMetaManager::remove_binlog(data_dir->get_meta(), suffix)); |
| } |
| LOG(INFO) << "remove " << unused_binlog_key_suffixes.size() |
| << " invalid binlog meta from dir: " << data_dir->path(); |
| unused_binlog_key_suffixes.clear(); |
| } |
| } |
| |
| void StorageEngine::_clean_unused_delete_bitmap() { |
| std::unordered_set<int64_t> removed_tablets; |
| auto clean_delete_bitmap_func = [this, &removed_tablets](int64_t tablet_id, int64_t version, |
| std::string_view val) -> bool { |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| if (removed_tablets.insert(tablet_id).second) { |
| LOG(INFO) << "clean ununsed delete bitmap for deleted tablet, tablet_id: " |
| << tablet_id; |
| } |
| } |
| return true; |
| }; |
| auto data_dirs = get_stores(); |
| for (auto data_dir : data_dirs) { |
| static_cast<void>(TabletMetaManager::traverse_delete_bitmap(data_dir->get_meta(), |
| clean_delete_bitmap_func)); |
| for (auto id : removed_tablets) { |
| static_cast<void>( |
| TabletMetaManager::remove_old_version_delete_bitmap(data_dir, id, INT64_MAX)); |
| } |
| LOG(INFO) << "removed invalid delete bitmap from dir: " << data_dir->path() |
| << ", deleted tablets size: " << removed_tablets.size(); |
| removed_tablets.clear(); |
| } |
| } |
| |
| void StorageEngine::_clean_unused_pending_publish_info() { |
| std::vector<std::pair<int64_t, int64_t>> removed_infos; |
| auto clean_pending_publish_info_func = [this, &removed_infos](int64_t tablet_id, |
| int64_t publish_version, |
| std::string_view info) -> bool { |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| removed_infos.emplace_back(tablet_id, publish_version); |
| } |
| return true; |
| }; |
| auto data_dirs = get_stores(); |
| for (auto data_dir : data_dirs) { |
| static_cast<void>(TabletMetaManager::traverse_pending_publish( |
| data_dir->get_meta(), clean_pending_publish_info_func)); |
| for (auto& [tablet_id, publish_version] : removed_infos) { |
| static_cast<void>(TabletMetaManager::remove_pending_publish_info(data_dir, tablet_id, |
| publish_version)); |
| } |
| LOG(INFO) << "removed invalid pending publish info from dir: " << data_dir->path() |
| << ", deleted pending publish info size: " << removed_infos.size(); |
| removed_infos.clear(); |
| } |
| } |
| |
| void StorageEngine::_clean_unused_partial_update_info() { |
| std::vector<std::tuple<int64_t, int64_t, int64_t>> remove_infos; |
| auto unused_partial_update_info_collector = |
| [this, &remove_infos](int64_t tablet_id, int64_t partition_id, int64_t txn_id, |
| std::string_view value) -> bool { |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| remove_infos.emplace_back(tablet_id, partition_id, txn_id); |
| return true; |
| } |
| TxnState txn_state = |
| _txn_manager->get_txn_state(partition_id, txn_id, tablet_id, tablet->tablet_uid()); |
| if (txn_state == TxnState::NOT_FOUND || txn_state == TxnState::ABORTED || |
| txn_state == TxnState::DELETED) { |
| remove_infos.emplace_back(tablet_id, partition_id, txn_id); |
| return true; |
| } |
| return true; |
| }; |
| auto data_dirs = get_stores(); |
| for (auto* data_dir : data_dirs) { |
| static_cast<void>(RowsetMetaManager::traverse_partial_update_info( |
| data_dir->get_meta(), unused_partial_update_info_collector)); |
| static_cast<void>( |
| RowsetMetaManager::remove_partial_update_infos(data_dir->get_meta(), remove_infos)); |
| } |
| } |
| |
| void StorageEngine::gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos) { |
| for (auto [tablet_id, version] : gc_tablet_infos) { |
| LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {}, version: {}", tablet_id, |
| version); |
| |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| LOG(WARNING) << fmt::format("tablet_id: {} not found", tablet_id); |
| continue; |
| } |
| tablet->gc_binlogs(version); |
| } |
| } |
| |
| void StorageEngine::_clean_unused_txns() { |
| std::set<TabletInfo> tablet_infos; |
| _txn_manager->get_all_related_tablets(&tablet_infos); |
| for (auto& tablet_info : tablet_infos) { |
| TabletSharedPtr tablet = |
| _tablet_manager->get_tablet(tablet_info.tablet_id, tablet_info.tablet_uid, true); |
| if (tablet == nullptr) { |
| // TODO(ygl) : should check if tablet still in meta, it's a improvement |
| // case 1: tablet still in meta, just remove from memory |
| // case 2: tablet not in meta store, remove rowset from meta |
| // currently just remove them from memory |
| // nullptr to indicate not remove them from meta store |
| _txn_manager->force_rollback_tablet_related_txns(nullptr, tablet_info.tablet_id, |
| tablet_info.tablet_uid); |
| } |
| } |
| } |
| |
| Status StorageEngine::_do_sweep(const std::string& scan_root, const time_t& local_now, |
| const int32_t expire) { |
| Status res = Status::OK(); |
| bool exists = true; |
| RETURN_IF_ERROR(io::global_local_filesystem()->exists(scan_root, &exists)); |
| if (!exists) { |
| // dir not existed. no need to sweep trash. |
| return res; |
| } |
| |
| int curr_sweep_batch_size = 0; |
| try { |
| // Sort pathes by name, that is by delete time. |
| std::vector<path> sorted_pathes; |
| std::copy(directory_iterator(scan_root), directory_iterator(), |
| std::back_inserter(sorted_pathes)); |
| std::sort(sorted_pathes.begin(), sorted_pathes.end()); |
| for (const auto& sorted_path : sorted_pathes) { |
| string dir_name = sorted_path.filename().string(); |
| string str_time = dir_name.substr(0, dir_name.find('.')); |
| tm local_tm_create; |
| local_tm_create.tm_isdst = 0; |
| if (strptime(str_time.c_str(), "%Y%m%d%H%M%S", &local_tm_create) == nullptr) { |
| res = Status::Error<OS_ERROR>("fail to strptime time. time={}", str_time); |
| continue; |
| } |
| |
| int32_t actual_expire = expire; |
| // try get timeout in dir name, the old snapshot dir does not contain timeout |
| // eg: 20190818221123.3.86400, the 86400 is timeout, in second |
| size_t pos = dir_name.find('.', str_time.size() + 1); |
| if (pos != string::npos) { |
| actual_expire = std::stoi(dir_name.substr(pos + 1)); |
| } |
| VLOG_TRACE << "get actual expire time " << actual_expire << " of dir: " << dir_name; |
| |
| string path_name = sorted_path.string(); |
| if (difftime(local_now, mktime(&local_tm_create)) >= actual_expire) { |
| res = io::global_local_filesystem()->delete_directory(path_name); |
| LOG(INFO) << "do sweep delete directory " << path_name << " local_now " << local_now |
| << "actual_expire " << actual_expire << " res " << res; |
| if (!res.ok()) { |
| continue; |
| } |
| |
| curr_sweep_batch_size++; |
| if (config::garbage_sweep_batch_size > 0 && |
| curr_sweep_batch_size >= config::garbage_sweep_batch_size) { |
| curr_sweep_batch_size = 0; |
| std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
| } |
| } else { |
| // Because files are ordered by filename, i.e. by create time, so all the left files are not expired. |
| break; |
| } |
| } |
| } catch (...) { |
| res = Status::Error<IO_ERROR>("Exception occur when scan directory. path_desc={}", |
| scan_root); |
| } |
| |
| return res; |
| } |
| |
| // invalid rowset type config will return ALPHA_ROWSET for system to run smoothly |
| void StorageEngine::_parse_default_rowset_type() { |
| std::string default_rowset_type_config = config::default_rowset_type; |
| boost::to_upper(default_rowset_type_config); |
| if (default_rowset_type_config == "BETA") { |
| _default_rowset_type = BETA_ROWSET; |
| } else if (default_rowset_type_config == "ALPHA") { |
| _default_rowset_type = ALPHA_ROWSET; |
| LOG(WARNING) << "default_rowset_type in be.conf should be set to beta, alpha is not " |
| "supported any more"; |
| } else { |
| LOG(FATAL) << "unknown value " << default_rowset_type_config |
| << " in default_rowset_type in be.conf"; |
| } |
| } |
| |
| void StorageEngine::start_delete_unused_rowset() { |
| DBUG_EXECUTE_IF("StorageEngine::start_delete_unused_rowset.block", DBUG_BLOCK); |
| LOG(INFO) << "start to delete unused rowset, size: " << _unused_rowsets.size() |
| << ", unused delete bitmap size: " << _unused_delete_bitmap.size(); |
| std::vector<RowsetSharedPtr> unused_rowsets_copy; |
| unused_rowsets_copy.reserve(_unused_rowsets.size()); |
| auto due_to_use_count = 0; |
| auto due_to_not_delete_file = 0; |
| auto due_to_delayed_expired_ts = 0; |
| std::set<int64_t> tablets_to_save_meta; |
| { |
| std::lock_guard<std::mutex> lock(_gc_mutex); |
| for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) { |
| auto&& rs = it->second; |
| if (rs.use_count() == 1 && rs->need_delete_file()) { |
| // remote rowset data will be reclaimed by `remove_unused_remote_files` |
| if (rs->is_local()) { |
| unused_rowsets_copy.push_back(std::move(rs)); |
| } |
| it = _unused_rowsets.erase(it); |
| } else { |
| if (rs.use_count() != 1) { |
| ++due_to_use_count; |
| } else if (!rs->need_delete_file()) { |
| ++due_to_not_delete_file; |
| } else { |
| ++due_to_delayed_expired_ts; |
| } |
| ++it; |
| } |
| } |
| // check remove delete bitmaps |
| for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) { |
| auto tablet_id = std::get<0>(*it); |
| auto tablet = _tablet_manager->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| it = _unused_delete_bitmap.erase(it); |
| continue; |
| } |
| auto& rowset_ids = std::get<1>(*it); |
| auto& key_ranges = std::get<2>(*it); |
| bool find_unused_rowset = false; |
| for (const auto& rowset_id : rowset_ids) { |
| if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) { |
| VLOG_DEBUG << "can not remove pre rowset delete bitmap because rowset is in use" |
| << ", tablet_id=" << tablet_id |
| << ", rowset_id=" << rowset_id.to_string(); |
| find_unused_rowset = true; |
| break; |
| } |
| } |
| if (find_unused_rowset) { |
| ++it; |
| continue; |
| } |
| tablet->tablet_meta()->delete_bitmap().remove(key_ranges); |
| tablets_to_save_meta.emplace(tablet_id); |
| it = _unused_delete_bitmap.erase(it); |
| } |
| } |
| LOG(INFO) << "collected " << unused_rowsets_copy.size() << " unused rowsets to remove, skipped " |
| << due_to_use_count << " rowsets due to use count > 1, skipped " |
| << due_to_not_delete_file << " rowsets due to don't need to delete file, skipped " |
| << due_to_delayed_expired_ts << " rowsets due to delayed expired timestamp. left " |
| << _unused_delete_bitmap.size() << " unused delete bitmap."; |
| for (auto&& rs : unused_rowsets_copy) { |
| VLOG_NOTICE << "start to remove rowset:" << rs->rowset_id() |
| << ", version:" << rs->version(); |
| // delete delete_bitmap of unused rowsets |
| if (auto tablet = _tablet_manager->get_tablet(rs->rowset_meta()->tablet_id()); |
| tablet && tablet->enable_unique_key_merge_on_write()) { |
| tablet->tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version()); |
| tablets_to_save_meta.emplace(tablet->tablet_id()); |
| } |
| Status status = rs->remove(); |
| unused_rowsets_counter << -1; |
| VLOG_NOTICE << "remove rowset:" << rs->rowset_id() << " finished. status:" << status; |
| } |
| for (const auto& tablet_id : tablets_to_save_meta) { |
| auto tablet = _tablet_manager->get_tablet(tablet_id); |
| if (tablet) { |
| std::shared_lock rlock(tablet->get_header_lock()); |
| tablet->save_meta(); |
| } |
| } |
| LOG(INFO) << "removed all collected unused rowsets"; |
| } |
| |
| void StorageEngine::add_unused_rowset(RowsetSharedPtr rowset) { |
| if (rowset == nullptr) { |
| return; |
| } |
| VLOG_NOTICE << "add unused rowset, rowset id:" << rowset->rowset_id() |
| << ", version:" << rowset->version(); |
| std::lock_guard<std::mutex> lock(_gc_mutex); |
| auto it = _unused_rowsets.find(rowset->rowset_id()); |
| if (it == _unused_rowsets.end()) { |
| rowset->set_need_delete_file(); |
| rowset->close(); |
| _unused_rowsets[rowset->rowset_id()] = std::move(rowset); |
| unused_rowsets_counter << 1; |
| } |
| } |
| |
| void StorageEngine::add_unused_delete_bitmap_key_ranges(int64_t tablet_id, |
| const std::vector<RowsetId>& rowsets, |
| const DeleteBitmapKeyRanges& key_ranges) { |
| VLOG_NOTICE << "add unused delete bitmap key ranges, tablet id:" << tablet_id; |
| std::lock_guard<std::mutex> lock(_gc_mutex); |
| _unused_delete_bitmap.push_back(std::make_tuple(tablet_id, rowsets, key_ranges)); |
| } |
| |
| // TODO(zc): refactor this funciton |
| Status StorageEngine::create_tablet(const TCreateTabletReq& request, RuntimeProfile* profile) { |
| // Get all available stores, use ref_root_path if the caller specified |
| std::vector<DataDir*> stores; |
| { |
| SCOPED_TIMER(ADD_TIMER(profile, "GetStores")); |
| stores = get_stores_for_create_tablet(request.partition_id, request.storage_medium); |
| } |
| if (stores.empty()) { |
| return Status::Error<CE_CMD_PARAMS_ERROR>( |
| "there is no available disk that can be used to create tablet."); |
| } |
| return _tablet_manager->create_tablet(request, stores, profile); |
| } |
| |
| Result<BaseTabletSPtr> StorageEngine::get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats, |
| bool force_use_cache) { |
| BaseTabletSPtr tablet; |
| std::string err; |
| tablet = _tablet_manager->get_tablet(tablet_id, true, &err); |
| if (tablet == nullptr) { |
| return unexpected( |
| Status::InternalError("failed to get tablet: {}, reason: {}", tablet_id, err)); |
| } |
| return tablet; |
| } |
| |
| Status StorageEngine::obtain_shard_path(TStorageMedium::type storage_medium, int64_t path_hash, |
| std::string* shard_path, DataDir** store, |
| int64_t partition_id) { |
| LOG(INFO) << "begin to process obtain root path. storage_medium=" << storage_medium; |
| |
| if (shard_path == nullptr) { |
| return Status::Error<CE_CMD_PARAMS_ERROR>( |
| "invalid output parameter which is null pointer."); |
| } |
| |
| auto stores = get_stores_for_create_tablet(partition_id, storage_medium); |
| if (stores.empty()) { |
| return Status::Error<NO_AVAILABLE_ROOT_PATH>( |
| "no available disk can be used to create tablet."); |
| } |
| |
| *store = nullptr; |
| if (path_hash != -1) { |
| for (auto data_dir : stores) { |
| if (data_dir->path_hash() == path_hash) { |
| *store = data_dir; |
| break; |
| } |
| } |
| } |
| if (*store == nullptr) { |
| *store = stores[0]; |
| } |
| |
| uint64_t shard = (*store)->get_shard(); |
| |
| std::stringstream root_path_stream; |
| root_path_stream << (*store)->path() << "/" << DATA_PREFIX << "/" << shard; |
| *shard_path = root_path_stream.str(); |
| |
| LOG(INFO) << "success to process obtain root path. path=" << *shard_path; |
| return Status::OK(); |
| } |
| |
| Status StorageEngine::load_header(const string& shard_path, const TCloneReq& request, |
| bool restore) { |
| LOG(INFO) << "begin to process load headers." |
| << "tablet_id=" << request.tablet_id << ", schema_hash=" << request.schema_hash; |
| Status res = Status::OK(); |
| |
| DataDir* store = nullptr; |
| { |
| // TODO(zc) |
| try { |
| auto store_path = |
| std::filesystem::path(shard_path).parent_path().parent_path().string(); |
| store = get_store(store_path); |
| if (store == nullptr) { |
| return Status::Error<INVALID_ROOT_PATH>("invalid shard path, path={}", shard_path); |
| } |
| } catch (...) { |
| return Status::Error<INVALID_ROOT_PATH>("invalid shard path, path={}", shard_path); |
| } |
| } |
| |
| std::stringstream schema_hash_path_stream; |
| schema_hash_path_stream << shard_path << "/" << request.tablet_id << "/" << request.schema_hash; |
| // not surely, reload and restore tablet action call this api |
| // reset tablet uid here |
| |
| string header_path = TabletMeta::construct_header_file_path(schema_hash_path_stream.str(), |
| request.tablet_id); |
| res = _tablet_manager->load_tablet_from_dir(store, request.tablet_id, request.schema_hash, |
| schema_hash_path_stream.str(), false, restore); |
| if (!res.ok()) { |
| LOG(WARNING) << "fail to process load headers. res=" << res; |
| return res; |
| } |
| |
| LOG(INFO) << "success to process load headers."; |
| return res; |
| } |
| |
| void BaseStorageEngine::register_report_listener(ReportWorker* listener) { |
| std::lock_guard<std::mutex> l(_report_mtx); |
| if (std::find(_report_listeners.begin(), _report_listeners.end(), listener) != |
| _report_listeners.end()) [[unlikely]] { |
| return; |
| } |
| _report_listeners.push_back(listener); |
| } |
| |
| void BaseStorageEngine::deregister_report_listener(ReportWorker* listener) { |
| std::lock_guard<std::mutex> l(_report_mtx); |
| if (auto it = std::find(_report_listeners.begin(), _report_listeners.end(), listener); |
| it != _report_listeners.end()) { |
| _report_listeners.erase(it); |
| } |
| } |
| |
| void BaseStorageEngine::notify_listeners() { |
| std::lock_guard<std::mutex> l(_report_mtx); |
| for (auto& listener : _report_listeners) { |
| listener->notify(); |
| } |
| } |
| |
| bool BaseStorageEngine::notify_listener(std::string_view name) { |
| bool found = false; |
| std::lock_guard<std::mutex> l(_report_mtx); |
| for (auto& listener : _report_listeners) { |
| if (listener->name() == name) { |
| listener->notify(); |
| found = true; |
| } |
| } |
| return found; |
| } |
| |
| void BaseStorageEngine::_evict_quring_rowset_thread_callback() { |
| int32_t interval = config::quering_rowsets_evict_interval; |
| do { |
| _evict_querying_rowset(); |
| interval = config::quering_rowsets_evict_interval; |
| if (interval <= 0) { |
| LOG(WARNING) << "quering_rowsets_evict_interval config is illegal: " << interval |
| << ", force set to 1"; |
| interval = 1; |
| } |
| } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
| } |
| |
| // check whether any unused rowsets's id equal to rowset_id |
| bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id) { |
| std::lock_guard<std::mutex> lock(_gc_mutex); |
| return _unused_rowsets.contains(rowset_id); |
| } |
| |
| PendingRowsetGuard StorageEngine::add_pending_rowset(const RowsetWriterContext& ctx) { |
| if (ctx.is_local_rowset()) { |
| return _pending_local_rowsets.add(ctx.rowset_id); |
| } |
| return _pending_remote_rowsets.add(ctx.rowset_id); |
| } |
| |
| bool StorageEngine::get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, |
| std::string* token) { |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id; |
| return false; |
| } |
| std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex); |
| if (_peer_replica_infos.contains(tablet_id) && |
| _peer_replica_infos[tablet_id].replica_id != tablet->replica_id()) { |
| *replica = _peer_replica_infos[tablet_id]; |
| *token = _token; |
| return true; |
| } |
| return false; |
| } |
| |
| bool StorageEngine::get_peers_replica_backends(int64_t tablet_id, std::vector<TBackend>* backends) { |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id; |
| return false; |
| } |
| int64_t cur_time = UnixMillis(); |
| if (cur_time - _last_get_peers_replica_backends_time_ms < 10000) { |
| LOG_WARNING("failed to get peers replica backens.") |
| .tag("tablet_id", tablet_id) |
| .tag("last time", _last_get_peers_replica_backends_time_ms) |
| .tag("cur time", cur_time); |
| return false; |
| } |
| LOG_INFO("start get peers replica backends info.").tag("tablet id", tablet_id); |
| ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info(); |
| if (cluster_info == nullptr) { |
| LOG(WARNING) << "Have not get FE Master heartbeat yet"; |
| return false; |
| } |
| TNetworkAddress master_addr = cluster_info->master_fe_addr; |
| if (master_addr.hostname.empty() || master_addr.port == 0) { |
| LOG(WARNING) << "Have not get FE Master heartbeat yet"; |
| return false; |
| } |
| TGetTabletReplicaInfosRequest request; |
| TGetTabletReplicaInfosResult result; |
| request.tablet_ids.emplace_back(tablet_id); |
| Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
| master_addr.hostname, master_addr.port, |
| [&request, &result](FrontendServiceConnection& client) { |
| client->getTabletReplicaInfos(result, request); |
| }); |
| |
| if (!rpc_st.ok()) { |
| LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc failure, " |
| "tablet id: " |
| << tablet_id; |
| return false; |
| } |
| std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex); |
| if (result.tablet_replica_infos.contains(tablet_id)) { |
| std::vector<TReplicaInfo> reps = result.tablet_replica_infos[tablet_id]; |
| if (reps.empty()) [[unlikely]] { |
| VLOG_DEBUG << "get_peers_replica_backends reps is empty, maybe this tablet is in " |
| "schema change. Go to FE to see more info. Tablet id: " |
| << tablet_id; |
| } |
| for (const auto& rep : reps) { |
| if (rep.replica_id != tablet->replica_id()) { |
| TBackend backend; |
| backend.__set_host(rep.host); |
| backend.__set_be_port(rep.be_port); |
| backend.__set_http_port(rep.http_port); |
| backend.__set_brpc_port(rep.brpc_port); |
| if (rep.__isset.is_alive) { |
| backend.__set_is_alive(rep.is_alive); |
| } |
| if (rep.__isset.backend_id) { |
| backend.__set_id(rep.backend_id); |
| } |
| backends->emplace_back(backend); |
| std::stringstream backend_string; |
| backend.printTo(backend_string); |
| LOG_INFO("get 1 peer replica backend info.") |
| .tag("tablet id", tablet_id) |
| .tag("backend info", backend_string.str()); |
| } |
| } |
| _last_get_peers_replica_backends_time_ms = UnixMillis(); |
| LOG_INFO("succeed get peers replica backends info.") |
| .tag("tablet id", tablet_id) |
| .tag("replica num", backends->size()); |
| return true; |
| } |
| return false; |
| } |
| |
| bool StorageEngine::should_fetch_from_peer(int64_t tablet_id) { |
| #ifdef BE_TEST |
| if (tablet_id % 2 == 0) { |
| return true; |
| } |
| return false; |
| #endif |
| TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id; |
| return false; |
| } |
| std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex); |
| if (_peer_replica_infos.contains(tablet_id)) { |
| return _peer_replica_infos[tablet_id].replica_id != tablet->replica_id(); |
| } |
| return false; |
| } |
| |
| // Return json: |
| // { |
| // "CumulativeCompaction": { |
| // "/home/disk1" : [10001, 10002], |
| // "/home/disk2" : [10003] |
| // }, |
| // "BaseCompaction": { |
| // "/home/disk1" : [10001, 10002], |
| // "/home/disk2" : [10003] |
| // } |
| // } |
| void StorageEngine::get_compaction_status_json(std::string* result) { |
| _compaction_submit_registry.jsonfy_compaction_status(result); |
| } |
| |
| void BaseStorageEngine::add_quering_rowset(RowsetSharedPtr rs) { |
| std::lock_guard<std::mutex> lock(_quering_rowsets_mutex); |
| _querying_rowsets.emplace(rs->rowset_id(), rs); |
| } |
| |
| RowsetSharedPtr BaseStorageEngine::get_quering_rowset(RowsetId rs_id) { |
| std::lock_guard<std::mutex> lock(_quering_rowsets_mutex); |
| auto it = _querying_rowsets.find(rs_id); |
| if (it != _querying_rowsets.end()) { |
| return it->second; |
| } |
| return nullptr; |
| } |
| |
| void BaseStorageEngine::_evict_querying_rowset() { |
| { |
| std::lock_guard<std::mutex> lock(_quering_rowsets_mutex); |
| for (auto it = _querying_rowsets.begin(); it != _querying_rowsets.end();) { |
| uint64_t now = UnixSeconds(); |
| // We delay the GC time of this rowset since it's maybe still needed, see #20732 |
| if (now > it->second->delayed_expired_timestamp()) { |
| it = _querying_rowsets.erase(it); |
| } else { |
| ++it; |
| } |
| } |
| } |
| |
| uint64_t now = UnixSeconds(); |
| ExecEnv::GetInstance()->get_id_manager()->gc_expired_id_file_map(now); |
| } |
| |
| bool BaseStorageEngine::_should_delay_large_task() { |
| DCHECK_GE(_cumu_compaction_thread_pool->max_threads(), |
| _cumu_compaction_thread_pool_used_threads); |
| DCHECK_GE(_cumu_compaction_thread_pool_small_tasks_running, 0); |
| // Case 1: Multiple threads available => accept large task |
| if (_cumu_compaction_thread_pool->max_threads() - _cumu_compaction_thread_pool_used_threads > |
| 0) { |
| return false; // No delay needed |
| } |
| // Case 2: Only one thread left => accept large task only if another small task is already running |
| if (_cumu_compaction_thread_pool_small_tasks_running > 0) { |
| return false; // No delay needed |
| } |
| // Case 3: Only one thread left, this is a large task, and no small tasks are running |
| // Delay this task to reserve capacity for potential small tasks |
| return true; // Delay this large task |
| } |
| |
| bool StorageEngine::add_broken_path(std::string path) { |
| std::lock_guard<std::mutex> lock(_broken_paths_mutex); |
| auto success = _broken_paths.emplace(path).second; |
| if (success) { |
| static_cast<void>(_persist_broken_paths()); |
| } |
| return success; |
| } |
| |
| bool StorageEngine::remove_broken_path(std::string path) { |
| std::lock_guard<std::mutex> lock(_broken_paths_mutex); |
| auto count = _broken_paths.erase(path); |
| if (count > 0) { |
| static_cast<void>(_persist_broken_paths()); |
| } |
| return count > 0; |
| } |
| |
| Status StorageEngine::_persist_broken_paths() { |
| std::string config_value; |
| for (const std::string& path : _broken_paths) { |
| config_value += path + ";"; |
| } |
| |
| if (config_value.length() > 0) { |
| auto st = config::set_config("broken_storage_path", config_value, true); |
| LOG(INFO) << "persist broken_storage_path " << config_value << st; |
| return st; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status StorageEngine::submit_clone_task(Tablet* tablet, int64_t version) { |
| std::vector<TBackend> backends; |
| if (!get_peers_replica_backends(tablet->tablet_id(), &backends)) { |
| return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
| "get_peers_replica_backends failed."); |
| } |
| TAgentTaskRequest task; |
| TCloneReq req; |
| req.__set_tablet_id(tablet->tablet_id()); |
| req.__set_schema_hash(tablet->schema_hash()); |
| req.__set_src_backends(backends); |
| req.__set_version(version); |
| req.__set_replica_id(tablet->replica_id()); |
| req.__set_partition_id(tablet->partition_id()); |
| req.__set_table_id(tablet->table_id()); |
| task.__set_task_type(TTaskType::CLONE); |
| task.__set_clone_req(req); |
| task.__set_priority(TPriority::HIGH); |
| task.__set_signature(tablet->tablet_id()); |
| LOG_INFO("BE start to submit missing rowset clone task.") |
| .tag("tablet_id", tablet->tablet_id()) |
| .tag("version", version) |
| .tag("replica_id", tablet->replica_id()) |
| .tag("partition_id", tablet->partition_id()) |
| .tag("table_id", tablet->table_id()); |
| RETURN_IF_ERROR(assert_cast<PriorTaskWorkerPool*>(workers->at(TTaskType::CLONE).get()) |
| ->submit_high_prior_and_cancel_low(task)); |
| return Status::OK(); |
| } |
| |
| int CreateTabletRRIdxCache::get_index(const std::string& key) { |
| auto* lru_handle = lookup(key); |
| if (lru_handle) { |
| Defer release([cache = this, lru_handle] { cache->release(lru_handle); }); |
| auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle); |
| VLOG_DEBUG << "use create tablet idx cache key=" << key << " value=" << value->idx; |
| return value->idx; |
| } |
| return -1; |
| } |
| |
| void CreateTabletRRIdxCache::set_index(const std::string& key, int next_idx) { |
| assert(next_idx >= 0); |
| auto* value = new CacheValue; |
| value->idx = next_idx; |
| auto* lru_handle = insert(key, value, 1, sizeof(int), CachePriority::NORMAL); |
| release(lru_handle); |
| } |
| #include "common/compile_check_end.h" |
| } // namespace doris |