blob: 126e9701a79af4c4932df1a0afad0adba9b76510 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_storage_engine.h"
#include <bvar/reducer.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>
#include <algorithm>
#include <memory>
#include <variant>
#include "cloud/cloud_base_compaction.h"
#include "cloud/cloud_compaction_stop_token.h"
#include "cloud/cloud_cumulative_compaction.h"
#include "cloud/cloud_cumulative_compaction_policy.h"
#include "cloud/cloud_full_compaction.h"
#include "cloud/cloud_index_change_compaction.h"
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_snapshot_mgr.h"
#include "cloud/cloud_tablet_hotspot.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/cloud_txn_delete_bitmap_cache.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/signal_handler.h"
#include "common/status.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_cache_common.h"
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/s3_file_system.h"
#include "io/hdfs_util.h"
#include "io/io_common.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/memtable_flush_executor.h"
#include "olap/storage_policy.h"
#include "runtime/memory/cache_manager.h"
#include "util/parse_util.h"
#include "util/time.h"
#include "vec/common/assert_cast.h"
namespace doris {
#include "common/compile_check_begin.h"
using namespace std::literals;
bvar::Adder<uint64_t> g_base_compaction_running_task_count("base_compaction_running_task_count");
bvar::Adder<uint64_t> g_full_compaction_running_task_count("full_compaction_running_task_count");
bvar::Adder<uint64_t> g_cumu_compaction_running_task_count(
"cumulative_compaction_running_task_count");
int get_cumu_thread_num() {
if (config::max_cumu_compaction_threads > 0) {
return config::max_cumu_compaction_threads;
}
int num_cores = doris::CpuInfo::num_cores();
return std::min(std::max(int(num_cores * config::cumu_compaction_thread_num_factor), 2), 20);
}
int get_base_thread_num() {
if (config::max_base_compaction_threads > 0) {
return config::max_base_compaction_threads;
}
int num_cores = doris::CpuInfo::num_cores();
return std::min(std::max(int(num_cores * config::base_compaction_thread_num_factor), 1), 10);
}
CloudStorageEngine::CloudStorageEngine(const EngineOptions& options)
: BaseStorageEngine(Type::CLOUD, options.backend_uid),
_meta_mgr(std::make_unique<cloud::CloudMetaMgr>()),
_tablet_mgr(std::make_unique<CloudTabletMgr>(*this)),
_options(options) {
_cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>();
_cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>();
_startup_timepoint = std::chrono::system_clock::now();
}
CloudStorageEngine::~CloudStorageEngine() {
stop();
}
static Status vault_process_error(std::string_view id,
std::variant<S3Conf, cloud::HdfsVaultInfo>& vault, Status err) {
std::stringstream ss;
std::visit(
[&]<typename T>(T& val) {
if constexpr (std::is_same_v<T, S3Conf>) {
ss << val.to_string();
} else if constexpr (std::is_same_v<T, cloud::HdfsVaultInfo>) {
val.SerializeToOstream(&ss);
}
},
vault);
return Status::IOError("Invalid vault, id {}, err {}, detail conf {}", id, err, ss.str());
}
struct VaultCreateFSVisitor {
VaultCreateFSVisitor(const std::string& id, const cloud::StorageVaultPB_PathFormat& path_format,
bool check_fs)
: id(id), path_format(path_format), check_fs(check_fs) {}
Status operator()(const S3Conf& s3_conf) const {
LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id
<< " check_fs: " << check_fs;
auto fs = DORIS_TRY(io::S3FileSystem::create(s3_conf, id));
if (check_fs && !s3_conf.client_conf.role_arn.empty()) {
bool res = false;
// just check connectivity, not care object if exist
auto st = fs->exists("not_exist_object", &res);
if (!st.ok()) {
LOG(FATAL) << "failed to check s3 fs, resource_id: " << id << " st: " << st
<< "s3_conf: " << s3_conf.to_string()
<< "add enable_check_storage_vault=false to be.conf to skip the check";
}
}
put_storage_resource(id, {std::move(fs), path_format}, 0);
LOG_INFO("successfully create s3 vault, vault id {}", id);
return Status::OK();
}
// TODO(ByteYue): Make sure enable_java_support is on
Status operator()(const cloud::HdfsVaultInfo& vault) const {
auto hdfs_params = io::to_hdfs_params(vault);
auto fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id,
nullptr, vault.prefix()));
put_storage_resource(id, {std::move(fs), path_format}, 0);
LOG_INFO("successfully create hdfs vault, vault id {}", id);
return Status::OK();
}
const std::string& id;
const cloud::StorageVaultPB_PathFormat& path_format;
bool check_fs;
};
struct RefreshFSVaultVisitor {
RefreshFSVaultVisitor(const std::string& id, io::FileSystemSPtr fs,
const cloud::StorageVaultPB_PathFormat& path_format)
: id(id), fs(std::move(fs)), path_format(path_format) {}
Status operator()(const S3Conf& s3_conf) const {
DCHECK_EQ(fs->type(), io::FileSystemType::S3) << id;
auto s3_fs = std::static_pointer_cast<io::S3FileSystem>(fs);
auto client_holder = s3_fs->client_holder();
auto st = client_holder->reset(s3_conf.client_conf);
if (!st.ok()) {
LOG(WARNING) << "failed to update s3 fs, resource_id=" << id << ": " << st;
}
return st;
}
Status operator()(const cloud::HdfsVaultInfo& vault) const {
auto hdfs_params = io::to_hdfs_params(vault);
auto hdfs_fs =
DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, nullptr,
vault.has_prefix() ? vault.prefix() : ""));
auto hdfs = std::static_pointer_cast<io::HdfsFileSystem>(hdfs_fs);
put_storage_resource(id, {std::move(hdfs), path_format}, 0);
return Status::OK();
}
const std::string& id;
io::FileSystemSPtr fs;
const cloud::StorageVaultPB_PathFormat& path_format;
};
Status CloudStorageEngine::open() {
sync_storage_vault();
// TODO(plat1ko): DeleteBitmapTxnManager
_memtable_flush_executor = std::make_unique<MemTableFlushExecutor>();
// Use file cache disks number
_memtable_flush_executor->init(
cast_set<int32_t>(io::FileCacheFactory::instance()->get_cache_instance_size()));
_calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>();
_calc_delete_bitmap_executor->init(config::calc_delete_bitmap_max_thread);
_calc_delete_bitmap_executor_for_load = std::make_unique<CalcDeleteBitmapExecutor>();
_calc_delete_bitmap_executor_for_load->init(
config::calc_delete_bitmap_for_load_max_thread > 0
? config::calc_delete_bitmap_for_load_max_thread
: std::max(1, CpuInfo::num_cores() / 2));
// The default cache is set to 100MB, use memory limit to dynamic adjustment
bool is_percent = false;
int64_t delete_bitmap_agg_cache_cache_limit =
ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit,
MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent);
_txn_delete_bitmap_cache = std::make_unique<CloudTxnDeleteBitmapCache>(
delete_bitmap_agg_cache_cache_limit > config::delete_bitmap_agg_cache_capacity
? delete_bitmap_agg_cache_cache_limit
: config::delete_bitmap_agg_cache_capacity);
RETURN_IF_ERROR(_txn_delete_bitmap_cache->init());
_file_cache_block_downloader = std::make_unique<io::FileCacheBlockDownloader>(*this);
_cloud_warm_up_manager = std::make_unique<CloudWarmUpManager>(*this);
_tablet_hotspot = std::make_unique<TabletHotspot>();
_cloud_snapshot_mgr = std::make_unique<CloudSnapshotMgr>(*this);
RETURN_NOT_OK_STATUS_WITH_WARN(
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
"init StreamLoadRecorder failed");
// check cluster id
RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id");
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
.set_max_threads(config::sync_load_for_tablets_thread)
.set_min_threads(config::sync_load_for_tablets_thread)
.build(&_sync_load_for_tablets_thread_pool),
"fail to build SyncLoadForTabletsThreadPool");
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool")
.set_max_threads(config::warmup_cache_async_thread)
.set_min_threads(config::warmup_cache_async_thread)
.build(&_warmup_cache_async_thread_pool),
"fail to build WarmupCacheAsyncThreadPool");
return Status::OK();
}
void CloudStorageEngine::stop() {
if (_stopped) {
return;
}
_stopped = true;
_stop_background_threads_latch.count_down();
for (auto&& t : _bg_threads) {
if (t) {
t->join();
}
}
if (_base_compaction_thread_pool) {
_base_compaction_thread_pool->shutdown();
}
if (_cumu_compaction_thread_pool) {
_cumu_compaction_thread_pool->shutdown();
}
LOG(INFO) << "Cloud storage engine is stopped.";
if (_calc_tablet_delete_bitmap_task_thread_pool) {
_calc_tablet_delete_bitmap_task_thread_pool->shutdown();
}
if (_sync_delete_bitmap_thread_pool) {
_sync_delete_bitmap_thread_pool->shutdown();
}
}
bool CloudStorageEngine::stopped() {
return _stopped;
}
Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id,
SyncRowsetStats* sync_stats,
bool force_use_only_cached,
bool cache_on_miss) {
return _tablet_mgr
->get_tablet(tablet_id, false, true, sync_stats, force_use_only_cached, cache_on_miss)
.transform([](auto&& t) { return static_pointer_cast<BaseTablet>(std::move(t)); });
}
Status CloudStorageEngine::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
bool force_use_only_cached) {
if (tablet_meta == nullptr) {
return Status::InvalidArgument("tablet_meta output is null");
}
#if 0
if (_tablet_mgr && _tablet_mgr->peek_tablet_meta(tablet_id, tablet_meta)) {
return Status::OK();
}
if (force_use_only_cached) {
return Status::NotFound("tablet meta {} not found in cache", tablet_id);
}
#endif
if (_meta_mgr == nullptr) {
return Status::InternalError("cloud meta manager is not initialized");
}
return _meta_mgr->get_tablet_meta(tablet_id, tablet_meta);
}
Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
RETURN_IF_ERROR(Thread::create(
"CloudStorageEngine", "refresh_s3_info_thread",
[this]() { this->_refresh_storage_vault_info_thread_callback(); },
&_bg_threads.emplace_back()));
LOG(INFO) << "refresh s3 info thread started";
RETURN_IF_ERROR(Thread::create(
"CloudStorageEngine", "vacuum_stale_rowsets_thread",
[this]() { this->_vacuum_stale_rowsets_thread_callback(); },
&_bg_threads.emplace_back()));
LOG(INFO) << "vacuum stale rowsets thread started";
RETURN_IF_ERROR(Thread::create(
"CloudStorageEngine", "sync_tablets_thread",
[this]() { this->_sync_tablets_thread_callback(); }, &_bg_threads.emplace_back()));
LOG(INFO) << "sync tablets thread started";
RETURN_IF_ERROR(Thread::create(
"CloudStorageEngine", "evict_querying_rowset_thread",
[this]() { this->_evict_quring_rowset_thread_callback(); },
&_evict_quering_rowset_thread));
LOG(INFO) << "evict quering thread started";
// add calculate tablet delete bitmap task thread pool
RETURN_IF_ERROR(ThreadPoolBuilder("TabletCalDeleteBitmapThreadPool")
.set_min_threads(config::calc_tablet_delete_bitmap_task_max_thread)
.set_max_threads(config::calc_tablet_delete_bitmap_task_max_thread)
.build(&_calc_tablet_delete_bitmap_task_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("SyncDeleteBitmapThreadPool")
.set_min_threads(config::sync_delete_bitmap_task_max_thread)
.set_max_threads(config::sync_delete_bitmap_task_max_thread)
.build(&_sync_delete_bitmap_thread_pool));
// TODO(plat1ko): check_bucket_enable_versioning_thread
// compaction tasks producer thread
int base_thread_num = get_base_thread_num();
int cumu_thread_num = get_cumu_thread_num();
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_thread_num)
.set_max_threads(base_thread_num)
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(cumu_thread_num)
.set_max_threads(cumu_thread_num)
.build(&_cumu_compaction_thread_pool));
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "compaction_tasks_producer_thread",
[this]() { this->_compaction_tasks_producer_callback(); },
&_bg_threads.emplace_back()));
LOG(INFO) << "compaction tasks producer thread started,"
<< " base thread num " << base_thread_num << " cumu thread num " << cumu_thread_num;
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "lease_compaction_thread",
[this]() { this->_lease_compaction_thread_callback(); }, &_bg_threads.emplace_back()));
LOG(INFO) << "lease compaction thread started";
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "check_tablet_delete_bitmap_score_thread",
[this]() { this->_check_tablet_delete_bitmap_score_callback(); },
&_bg_threads.emplace_back()));
LOG(INFO) << "check tablet delete bitmap score thread started";
return Status::OK();
}
void CloudStorageEngine::sync_storage_vault() {
cloud::StorageVaultInfos vault_infos;
bool enable_storage_vault = false;
auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault);
if (!st.ok()) {
LOG(WARNING) << "failed to get storage vault info. err=" << st;
return;
}
if (vault_infos.empty()) {
LOG(WARNING) << "empty storage vault info";
return;
}
bool check_storage_vault = false;
bool expected = false;
if (first_sync_storage_vault.compare_exchange_strong(expected, true)) {
check_storage_vault = config::enable_check_storage_vault;
LOG(INFO) << "first sync storage vault info, BE try to check iam role connectivity, "
"check_storage_vault="
<< check_storage_vault;
}
for (auto& [id, vault_info, path_format] : vault_infos) {
auto fs = get_filesystem(id);
auto status =
(fs == nullptr)
? std::visit(VaultCreateFSVisitor {id, path_format, check_storage_vault},
vault_info)
: std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format},
vault_info);
if (!status.ok()) [[unlikely]] {
LOG(WARNING) << vault_process_error(id, vault_info, std::move(st));
}
}
if (auto& id = std::get<0>(vault_infos.back());
(latest_fs() == nullptr || latest_fs()->id() != id) && !enable_storage_vault) {
set_latest_fs(get_filesystem(id));
}
}
// We should enable_java_support if we want to use hdfs vault
void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::refresh_s3_info_interval_s))) {
sync_storage_vault();
}
}
void CloudStorageEngine::_vacuum_stale_rowsets_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::vacuum_stale_rowsets_interval_s))) {
_tablet_mgr->vacuum_stale_rowsets(_stop_background_threads_latch);
}
}
void CloudStorageEngine::_sync_tablets_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::schedule_sync_tablets_interval_s))) {
_tablet_mgr->sync_tablets(_stop_background_threads_latch);
}
}
void CloudStorageEngine::get_cumu_compaction(
int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) {
std::lock_guard lock(_compaction_mtx);
if (auto it = _submitted_cumu_compactions.find(tablet_id);
it != _submitted_cumu_compactions.end()) {
res = it->second;
}
}
Status CloudStorageEngine::_adjust_compaction_thread_num() {
int base_thread_num = get_base_thread_num();
if (!_base_compaction_thread_pool || !_cumu_compaction_thread_pool) {
LOG(WARNING) << "base or cumu compaction thread pool is not created";
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("");
}
if (_base_compaction_thread_pool->max_threads() != base_thread_num) {
int old_max_threads = _base_compaction_thread_pool->max_threads();
Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num);
if (status.ok()) {
VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
<< " to " << base_thread_num;
}
}
if (_base_compaction_thread_pool->min_threads() != base_thread_num) {
int old_min_threads = _base_compaction_thread_pool->min_threads();
Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num);
if (status.ok()) {
VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
<< " to " << base_thread_num;
}
}
int cumu_thread_num = get_cumu_thread_num();
if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) {
int old_max_threads = _cumu_compaction_thread_pool->max_threads();
Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num);
if (status.ok()) {
VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
<< " to " << cumu_thread_num;
}
}
if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) {
int old_min_threads = _cumu_compaction_thread_pool->min_threads();
Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num);
if (status.ok()) {
VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
<< " to " << cumu_thread_num;
}
}
return Status::OK();
}
void CloudStorageEngine::_compaction_tasks_producer_callback() {
LOG(INFO) << "try to start compaction producer process!";
int round = 0;
CompactionType compaction_type;
// Used to record the time when the score metric was last updated.
// The update of the score metric is accompanied by the logic of selecting the tablet.
// If there is no slot available, the logic of selecting the tablet will be terminated,
// which causes the score metric update to be terminated.
// In order to avoid this situation, we need to update the score regularly.
int64_t last_cumulative_score_update_time = 0;
int64_t last_base_score_update_time = 0;
static const int64_t check_score_interval_ms = 5000; // 5 secs
int64_t interval = config::generate_compaction_tasks_interval_ms;
do {
int64_t cur_time = UnixMillis();
if (!config::disable_auto_compaction) {
Status st = _adjust_compaction_thread_num();
if (!st.ok()) {
break;
}
bool check_score = false;
if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
compaction_type = CompactionType::CUMULATIVE_COMPACTION;
round++;
if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) {
check_score = true;
last_cumulative_score_update_time = cur_time;
}
} else {
compaction_type = CompactionType::BASE_COMPACTION;
round = 0;
if (cur_time - last_base_score_update_time >= check_score_interval_ms) {
check_score = true;
last_base_score_update_time = cur_time;
}
}
std::unique_ptr<ThreadPool>& thread_pool =
(compaction_type == CompactionType::CUMULATIVE_COMPACTION)
? _cumu_compaction_thread_pool
: _base_compaction_thread_pool;
VLOG_CRITICAL << "compaction thread pool. type: "
<< (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
: "BASE")
<< ", num_threads: " << thread_pool->num_threads()
<< ", num_threads_pending_start: "
<< thread_pool->num_threads_pending_start()
<< ", num_active_threads: " << thread_pool->num_active_threads()
<< ", max_threads: " << thread_pool->max_threads()
<< ", min_threads: " << thread_pool->min_threads()
<< ", num_total_queued_tasks: " << thread_pool->get_queue_size();
std::vector<CloudTabletSPtr> tablets_compaction =
_generate_cloud_compaction_tasks(compaction_type, check_score);
/// Regardless of whether the tablet is submitted for compaction or not,
/// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects
/// in the tablet, because these two objects store the tablet's own shared_ptr.
/// If it is not cleaned up, the reference count of the tablet will always be greater than 1,
/// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep)
for (const auto& tablet : tablets_compaction) {
Status status = submit_compaction_task(tablet, compaction_type);
if (status.ok()) continue;
if ((!status.is<ErrorCode::BE_NO_SUITABLE_VERSION>() &&
!status.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) ||
VLOG_DEBUG_IS_ON) {
LOG(WARNING) << "failed to submit compaction task for tablet: "
<< tablet->tablet_id() << ", err: " << status;
}
}
interval = config::generate_compaction_tasks_interval_ms;
} else {
interval = config::check_auto_compaction_interval_seconds * 1000;
}
int64_t end_time = UnixMillis();
DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time -
cur_time);
} while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
}
void CloudStorageEngine::unregister_index_change_compaction(int64_t tablet_id,
bool is_base_compact) {
std::lock_guard lock(_compaction_mtx);
if (is_base_compact) {
_submitted_index_change_base_compaction.erase(tablet_id);
} else {
_submitted_index_change_cumu_compaction.erase(tablet_id);
}
}
bool CloudStorageEngine::register_index_change_compaction(
std::shared_ptr<CloudIndexChangeCompaction> compact, int64_t tablet_id,
bool is_base_compact, std::string& err_reason) {
std::lock_guard lock(_compaction_mtx);
if (is_base_compact) {
if (_submitted_base_compactions.contains(tablet_id) ||
_submitted_full_compactions.contains(tablet_id) ||
_submitted_index_change_base_compaction.contains(tablet_id)) {
std::stringstream ss;
ss << "reason:" << ((int)_submitted_base_compactions.contains(tablet_id)) << ", "
<< ((int)_submitted_full_compactions.contains(tablet_id)) << ", "
<< ((int)_submitted_index_change_base_compaction.contains(tablet_id));
err_reason = ss.str();
return false;
} else {
_submitted_index_change_base_compaction[tablet_id] = compact;
return true;
}
} else {
if (_tablet_preparing_cumu_compaction.contains(tablet_id) ||
_submitted_cumu_compactions.contains(tablet_id) ||
_submitted_index_change_cumu_compaction.contains(tablet_id)) {
std::stringstream ss;
ss << "reason:" << ((int)_tablet_preparing_cumu_compaction.contains(tablet_id)) << ", "
<< ((int)_submitted_cumu_compactions.contains(tablet_id)) << ", "
<< ((int)_submitted_index_change_cumu_compaction.contains(tablet_id));
err_reason = ss.str();
return false;
} else {
_submitted_index_change_cumu_compaction[tablet_id] = compact;
}
return true;
}
}
std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks(
CompactionType compaction_type, bool check_score) {
std::vector<std::shared_ptr<CloudTablet>> tablets_compaction;
int64_t max_compaction_score = 0;
std::unordered_set<int64_t> tablet_preparing_cumu_compaction;
std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
submitted_cumu_compactions;
std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> submitted_base_compactions;
std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> submitted_full_compactions;
std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
submitted_index_change_cumu_compactions;
std::unordered_map<int64_t, std::shared_ptr<CloudIndexChangeCompaction>>
submitted_index_change_base_compactions;
{
std::lock_guard lock(_compaction_mtx);
tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction;
submitted_cumu_compactions = _submitted_cumu_compactions;
submitted_base_compactions = _submitted_base_compactions;
submitted_full_compactions = _submitted_full_compactions;
submitted_index_change_cumu_compactions = _submitted_index_change_cumu_compaction;
submitted_index_change_base_compactions = _submitted_index_change_base_compaction;
}
bool need_pick_tablet = true;
int thread_per_disk =
config::compaction_task_num_per_fast_disk; // all disks are fast in cloud mode
int num_cumu =
std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0,
[](int a, auto& b) { return a + b.second.size(); });
int num_base =
cast_set<int>(submitted_base_compactions.size() + submitted_full_compactions.size());
int n = thread_per_disk - num_cumu - num_base;
if (compaction_type == CompactionType::BASE_COMPACTION) {
// We need to reserve at least one thread for cumulative compaction,
// because base compactions may take too long to complete, which may
// leads to "too many rowsets" error.
int base_n = std::min(config::max_base_compaction_task_num_per_disk, thread_per_disk - 1) -
num_base;
n = std::min(base_n, n);
}
if (n <= 0) { // No threads available
if (!check_score) return tablets_compaction;
need_pick_tablet = false;
n = 0;
}
// Return true for skipping compaction
std::function<bool(CloudTablet*)> filter_out;
if (compaction_type == CompactionType::BASE_COMPACTION) {
filter_out = [&submitted_base_compactions, &submitted_full_compactions,
&submitted_index_change_base_compactions](CloudTablet* t) {
return submitted_base_compactions.contains(t->tablet_id()) ||
submitted_full_compactions.contains(t->tablet_id()) ||
submitted_index_change_base_compactions.contains(t->tablet_id()) ||
t->tablet_state() != TABLET_RUNNING;
};
} else if (config::enable_parallel_cumu_compaction) {
filter_out = [&tablet_preparing_cumu_compaction,
&submitted_index_change_cumu_compactions](CloudTablet* t) {
return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
submitted_index_change_cumu_compactions.contains(t->tablet_id()) ||
(t->tablet_state() != TABLET_RUNNING &&
(!config::enable_new_tablet_do_compaction || t->alter_version() == -1));
};
} else {
filter_out = [&tablet_preparing_cumu_compaction, &submitted_cumu_compactions,
&submitted_index_change_cumu_compactions](CloudTablet* t) {
return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
submitted_index_change_cumu_compactions.contains(t->tablet_id()) ||
submitted_cumu_compactions.contains(t->tablet_id()) ||
(t->tablet_state() != TABLET_RUNNING &&
(!config::enable_new_tablet_do_compaction || t->alter_version() == -1));
};
}
// Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(),
// So that we can update the max_compaction_score metric.
do {
std::vector<CloudTabletSPtr> tablets;
auto st = tablet_mgr().get_topn_tablets_to_compact(n, compaction_type, filter_out, &tablets,
&max_compaction_score);
if (!st.ok()) {
LOG(WARNING) << "failed to get tablets to compact, err=" << st;
break;
}
if (!need_pick_tablet) break;
tablets_compaction = std::move(tablets);
} while (false);
if (max_compaction_score > 0) {
if (compaction_type == CompactionType::BASE_COMPACTION) {
DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(
max_compaction_score);
} else {
DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(
max_compaction_score);
}
}
return tablets_compaction;
}
Status CloudStorageEngine::_request_tablet_global_compaction_lock(
ReaderType compaction_type, const CloudTabletSPtr& tablet,
std::shared_ptr<CloudCompactionMixin> compaction) {
long now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
auto cumu_compaction = static_pointer_cast<CloudCumulativeCompaction>(compaction);
if (auto st = cumu_compaction->request_global_lock(); !st.ok()) {
LOG_WARNING("failed to request cumu compactoin global lock")
.tag("tablet id", tablet->tablet_id())
.tag("msg", st.to_string());
tablet->set_last_cumu_compaction_failure_time(now);
return st;
}
{
std::lock_guard lock(_compaction_mtx);
_executing_cumu_compactions[tablet->tablet_id()].push_back(cumu_compaction);
}
return Status::OK();
} else if (compaction_type == ReaderType::READER_BASE_COMPACTION) {
auto base_compaction = static_pointer_cast<CloudBaseCompaction>(compaction);
if (auto st = base_compaction->request_global_lock(); !st.ok()) {
LOG_WARNING("failed to request base compactoin global lock")
.tag("tablet id", tablet->tablet_id())
.tag("msg", st.to_string());
tablet->set_last_base_compaction_failure_time(now);
return st;
}
{
std::lock_guard lock(_compaction_mtx);
_executing_base_compactions[tablet->tablet_id()] = base_compaction;
}
return Status::OK();
} else if (compaction_type == ReaderType::READER_FULL_COMPACTION) {
auto full_compaction = static_pointer_cast<CloudFullCompaction>(compaction);
if (auto st = full_compaction->request_global_lock(); !st.ok()) {
LOG_WARNING("failed to request full compactoin global lock")
.tag("tablet id", tablet->tablet_id())
.tag("msg", st.to_string());
tablet->set_last_full_compaction_failure_time(now);
return st;
}
{
std::lock_guard lock(_compaction_mtx);
_executing_full_compactions[tablet->tablet_id()] = full_compaction;
}
return Status::OK();
} else {
LOG(WARNING) << "unsupport compaction task for tablet: " << tablet->tablet_id()
<< ", compaction name: " << compaction->compaction_name();
return Status::NotFound("Unsupport compaction type {}", compaction->compaction_name());
}
}
Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet) {
using namespace std::chrono;
{
std::lock_guard lock(_compaction_mtx);
// Take a placeholder for base compaction
auto [_, success] = _submitted_base_compactions.emplace(tablet->tablet_id(), nullptr);
if (!success) {
return Status::AlreadyExist(
"other base compaction or full compaction is submitted, tablet_id={}",
tablet->tablet_id());
}
}
auto compaction = std::make_shared<CloudBaseCompaction>(*this, tablet);
auto st = compaction->prepare_compact();
if (!st.ok()) {
long now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
tablet->set_last_base_compaction_failure_time(now);
std::lock_guard lock(_compaction_mtx);
_submitted_base_compactions.erase(tablet->tablet_id());
return st;
}
{
std::lock_guard lock(_compaction_mtx);
_submitted_base_compactions[tablet->tablet_id()] = compaction;
}
st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
DorisMetrics::instance()->base_compaction_task_running_total->increment(1);
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
_base_compaction_thread_pool->get_queue_size());
g_base_compaction_running_task_count << 1;
signal::tablet_id = tablet->tablet_id();
Defer defer {[&]() {
g_base_compaction_running_task_count << -1;
std::lock_guard lock(_compaction_mtx);
_submitted_base_compactions.erase(tablet->tablet_id());
DorisMetrics::instance()->base_compaction_task_running_total->increment(-1);
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
_base_compaction_thread_pool->get_queue_size());
}};
auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet,
compaction);
if (!st.ok()) return;
st = compaction->execute_compact();
if (!st.ok()) {
// Error log has been output in `execute_compact`
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
tablet->set_last_base_compaction_failure_time(now);
}
std::lock_guard lock(_compaction_mtx);
_executing_base_compactions.erase(tablet->tablet_id());
});
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
_base_compaction_thread_pool->get_queue_size());
if (!st.ok()) {
std::lock_guard lock(_compaction_mtx);
_submitted_base_compactions.erase(tablet->tablet_id());
return Status::InternalError("failed to submit base compaction, tablet_id={}",
tablet->tablet_id());
}
return st;
}
Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet) {
using namespace std::chrono;
{
std::lock_guard lock(_compaction_mtx);
if (!config::enable_parallel_cumu_compaction &&
_submitted_cumu_compactions.count(tablet->tablet_id())) {
return Status::AlreadyExist("other cumu compaction is submitted, tablet_id={}",
tablet->tablet_id());
}
auto [_, success] = _tablet_preparing_cumu_compaction.insert(tablet->tablet_id());
if (!success) {
return Status::AlreadyExist("other cumu compaction is preparing, tablet_id={}",
tablet->tablet_id());
}
}
auto compaction = std::make_shared<CloudCumulativeCompaction>(*this, tablet);
auto st = compaction->prepare_compact();
if (!st.ok()) {
long now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (!st.is<ErrorCode::CUMULATIVE_MEET_DELETE_VERSION>()) {
if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
// Backoff strategy if no suitable version
tablet->last_cumu_no_suitable_version_ms = now;
} else {
tablet->set_last_cumu_compaction_failure_time(now);
}
}
std::lock_guard lock(_compaction_mtx);
_tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
return st;
}
{
std::lock_guard lock(_compaction_mtx);
_tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
_submitted_cumu_compactions[tablet->tablet_id()].push_back(compaction);
}
auto erase_submitted_cumu_compaction = [=, this]() {
std::lock_guard lock(_compaction_mtx);
auto it = _submitted_cumu_compactions.find(tablet->tablet_id());
DCHECK(it != _submitted_cumu_compactions.end());
auto& compactions = it->second;
auto it1 = std::find(compactions.begin(), compactions.end(), compaction);
DCHECK(it1 != compactions.end());
compactions.erase(it1);
if (compactions.empty()) { // No compactions on this tablet, erase key
_submitted_cumu_compactions.erase(it);
// No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to
// enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform
// cumu compaction on tablet which has suitable versions for cumu compaction.
tablet->last_cumu_no_suitable_version_ms = 0;
}
};
auto erase_executing_cumu_compaction = [=, this]() {
std::lock_guard lock(_compaction_mtx);
auto it = _executing_cumu_compactions.find(tablet->tablet_id());
DCHECK(it != _executing_cumu_compactions.end());
auto& compactions = it->second;
auto it1 = std::find(compactions.begin(), compactions.end(), compaction);
DCHECK(it1 != compactions.end());
compactions.erase(it1);
if (compactions.empty()) { // No compactions on this tablet, erase key
_executing_cumu_compactions.erase(it);
// No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to
// enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform
// cumu compaction on tablet which has suitable versions for cumu compaction.
tablet->last_cumu_no_suitable_version_ms = 0;
}
};
st = _cumu_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1);
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
_cumu_compaction_thread_pool->get_queue_size());
DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line",
{ sleep(5); })
signal::tablet_id = tablet->tablet_id();
g_cumu_compaction_running_task_count << 1;
bool is_large_task = true;
Defer defer {[&]() {
DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.sleep",
{ sleep(5); })
std::lock_guard lock(_cumu_compaction_delay_mtx);
_cumu_compaction_thread_pool_used_threads--;
if (!is_large_task) {
_cumu_compaction_thread_pool_small_tasks_running--;
}
g_cumu_compaction_running_task_count << -1;
erase_submitted_cumu_compaction();
DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1);
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
_cumu_compaction_thread_pool->get_queue_size());
}};
auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION,
tablet, compaction);
if (!st.ok()) return;
do {
std::lock_guard lock(_cumu_compaction_delay_mtx);
_cumu_compaction_thread_pool_used_threads++;
if (config::large_cumu_compaction_task_min_thread_num > 1 &&
_cumu_compaction_thread_pool->max_threads() >=
config::large_cumu_compaction_task_min_thread_num) {
// Determine if this is a small task based on configured thresholds
is_large_task = (compaction->get_input_rowsets_bytes() >
config::large_cumu_compaction_task_bytes_threshold ||
compaction->get_input_num_rows() >
config::large_cumu_compaction_task_row_num_threshold);
// Small task. No delay needed
if (!is_large_task) {
_cumu_compaction_thread_pool_small_tasks_running++;
break;
}
// Deal with large task
if (_should_delay_large_task()) {
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch())
.count();
// sleep 5s for this tablet
tablet->set_last_cumu_compaction_failure_time(now);
erase_executing_cumu_compaction();
LOG_WARNING(
"failed to do CloudCumulativeCompaction, cumu thread pool is "
"intensive, delay large task.")
.tag("tablet_id", tablet->tablet_id())
.tag("input_rows", compaction->get_input_num_rows())
.tag("input_rowsets_total_size", compaction->get_input_rowsets_bytes())
.tag("config::large_cumu_compaction_task_bytes_threshold",
config::large_cumu_compaction_task_bytes_threshold)
.tag("config::large_cumu_compaction_task_row_num_threshold",
config::large_cumu_compaction_task_row_num_threshold)
.tag("remaining threads", _cumu_compaction_thread_pool_used_threads)
.tag("small_tasks_running",
_cumu_compaction_thread_pool_small_tasks_running);
return;
}
}
} while (false);
st = compaction->execute_compact();
if (!st.ok()) {
// Error log has been output in `execute_compact`
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
tablet->set_last_cumu_compaction_failure_time(now);
}
erase_executing_cumu_compaction();
});
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
_cumu_compaction_thread_pool->get_queue_size());
if (!st.ok()) {
erase_submitted_cumu_compaction();
return Status::InternalError("failed to submit cumu compaction, tablet_id={}",
tablet->tablet_id());
}
return st;
}
Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet) {
using namespace std::chrono;
{
std::lock_guard lock(_compaction_mtx);
// Take a placeholder for full compaction
auto [_, success] = _submitted_full_compactions.emplace(tablet->tablet_id(), nullptr);
if (!success) {
return Status::AlreadyExist(
"other full compaction or base compaction is submitted, tablet_id={}",
tablet->tablet_id());
}
}
//auto compaction = std::make_shared<CloudFullCompaction>(tablet);
auto compaction = std::make_shared<CloudFullCompaction>(*this, tablet);
auto st = compaction->prepare_compact();
if (!st.ok()) {
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
tablet->set_last_full_compaction_failure_time(now);
std::lock_guard lock(_compaction_mtx);
_submitted_full_compactions.erase(tablet->tablet_id());
return st;
}
{
std::lock_guard lock(_compaction_mtx);
_submitted_full_compactions[tablet->tablet_id()] = compaction;
}
st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() {
g_full_compaction_running_task_count << 1;
signal::tablet_id = tablet->tablet_id();
Defer defer {[&]() {
g_full_compaction_running_task_count << -1;
std::lock_guard lock(_compaction_mtx);
_submitted_full_compactions.erase(tablet->tablet_id());
}};
auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet,
compaction);
if (!st.ok()) return;
st = compaction->execute_compact();
if (!st.ok()) {
// Error log has been output in `execute_compact`
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
tablet->set_last_full_compaction_failure_time(now);
}
std::lock_guard lock(_compaction_mtx);
_executing_full_compactions.erase(tablet->tablet_id());
});
if (!st.ok()) {
std::lock_guard lock(_compaction_mtx);
_submitted_full_compactions.erase(tablet->tablet_id());
return Status::InternalError("failed to submit full compaction, tablet_id={}",
tablet->tablet_id());
}
return st;
}
Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet,
CompactionType compaction_type) {
DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION ||
compaction_type == CompactionType::BASE_COMPACTION ||
compaction_type == CompactionType::FULL_COMPACTION);
switch (compaction_type) {
case CompactionType::BASE_COMPACTION:
RETURN_IF_ERROR(_submit_base_compaction_task(tablet));
return Status::OK();
case CompactionType::CUMULATIVE_COMPACTION:
RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet));
return Status::OK();
case CompactionType::FULL_COMPACTION:
RETURN_IF_ERROR(_submit_full_compaction_task(tablet));
return Status::OK();
default:
return Status::InternalError("unknown compaction type!");
}
}
void CloudStorageEngine::_lease_compaction_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::lease_compaction_interval_seconds))) {
std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions;
std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions;
std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions;
std::vector<std::shared_ptr<CloudCompactionStopToken>> compation_stop_tokens;
std::vector<std::shared_ptr<CloudIndexChangeCompaction>> index_change_compations;
{
std::lock_guard lock(_compaction_mtx);
for (auto& [_, base] : _executing_base_compactions) {
if (base) { // `base` might be a nullptr placeholder
base_compactions.push_back(base);
}
}
for (auto& [_, cumus] : _executing_cumu_compactions) {
for (auto& cumu : cumus) {
cumu_compactions.push_back(cumu);
}
}
for (auto& [_, full] : _executing_full_compactions) {
if (full) {
full_compactions.push_back(full);
}
}
for (auto& [_, stop_token] : _active_compaction_stop_tokens) {
if (stop_token) {
compation_stop_tokens.push_back(stop_token);
}
}
for (auto& [_, index_change] : _submitted_index_change_cumu_compaction) {
if (index_change) {
index_change_compations.push_back(index_change);
}
}
for (auto& [_, index_change] : _submitted_index_change_base_compaction) {
if (index_change) {
index_change_compations.push_back(index_change);
}
}
}
// TODO(plat1ko): Support batch lease rpc
for (auto& stop_token : compation_stop_tokens) {
stop_token->do_lease();
}
for (auto& comp : full_compactions) {
comp->do_lease();
}
for (auto& comp : cumu_compactions) {
comp->do_lease();
}
for (auto& comp : base_compactions) {
comp->do_lease();
}
for (auto& comp : index_change_compations) {
comp->do_lease();
}
}
}
void CloudStorageEngine::_check_tablet_delete_bitmap_score_callback() {
LOG(INFO) << "try to start check tablet delete bitmap score!";
while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) {
if (!config::enable_check_tablet_delete_bitmap_score) {
return;
}
uint64_t max_delete_bitmap_score = 0;
uint64_t max_base_rowset_delete_bitmap_score = 0;
tablet_mgr().get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score,
&max_base_rowset_delete_bitmap_score);
if (max_delete_bitmap_score > 0) {
_tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score);
}
if (max_base_rowset_delete_bitmap_score > 0) {
_tablet_max_base_rowset_delete_bitmap_score_metrics->set_value(
max_base_rowset_delete_bitmap_score);
}
}
}
Status CloudStorageEngine::get_compaction_status_json(std::string* result) {
rapidjson::Document root;
root.SetObject();
std::lock_guard lock(_compaction_mtx);
// cumu
std::string_view cumu = "CumulativeCompaction";
rapidjson::Value cumu_key;
cumu_key.SetString(cumu.data(), cast_set<uint32_t>(cumu.length()), root.GetAllocator());
rapidjson::Document cumu_arr;
cumu_arr.SetArray();
for (auto& [tablet_id, v] : _submitted_cumu_compactions) {
for (int i = 0; i < v.size(); ++i) {
cumu_arr.PushBack(tablet_id, root.GetAllocator());
}
}
root.AddMember(cumu_key, cumu_arr, root.GetAllocator());
// base
std::string_view base = "BaseCompaction";
rapidjson::Value base_key;
base_key.SetString(base.data(), cast_set<uint32_t>(base.length()), root.GetAllocator());
rapidjson::Document base_arr;
base_arr.SetArray();
for (auto& [tablet_id, _] : _submitted_base_compactions) {
base_arr.PushBack(tablet_id, root.GetAllocator());
}
root.AddMember(base_key, base_arr, root.GetAllocator());
rapidjson::StringBuffer strbuf;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
root.Accept(writer);
*result = std::string(strbuf.GetString());
return Status::OK();
}
std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compaction_policy(
std::string_view compaction_policy) {
if (!_cumulative_compaction_policies.contains(compaction_policy)) {
return _cumulative_compaction_policies.at(CUMULATIVE_SIZE_BASED_POLICY);
}
return _cumulative_compaction_policies.at(compaction_policy);
}
Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet,
int64_t initiator) {
{
std::lock_guard lock(_compaction_mtx);
auto [_, success] = _active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr);
if (!success) {
return Status::AlreadyExist("stop token already exists for tablet_id={}",
tablet->tablet_id());
}
}
auto stop_token = std::make_shared<CloudCompactionStopToken>(*this, tablet, initiator);
auto st = stop_token->do_register();
if (!st.ok()) {
std::lock_guard lock(_compaction_mtx);
_active_compaction_stop_tokens.erase(tablet->tablet_id());
return st;
}
{
std::lock_guard lock(_compaction_mtx);
_active_compaction_stop_tokens[tablet->tablet_id()] = stop_token;
}
LOG_INFO(
"successfully register compaction stop token for tablet_id={}, "
"delete_bitmap_lock_initiator={}",
tablet->tablet_id(), initiator);
return st;
}
Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms) {
std::shared_ptr<CloudCompactionStopToken> stop_token;
{
std::lock_guard lock(_compaction_mtx);
if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id());
it != _active_compaction_stop_tokens.end()) {
stop_token = it->second;
} else {
return Status::NotFound("stop token not found for tablet_id={}", tablet->tablet_id());
}
_active_compaction_stop_tokens.erase(tablet->tablet_id());
}
LOG_INFO("successfully unregister compaction stop token for tablet_id={}", tablet->tablet_id());
if (stop_token && clear_ms) {
RETURN_IF_ERROR(stop_token->do_unregister());
LOG_INFO(
"successfully remove compaction stop token from MS for tablet_id={}, "
"delete_bitmap_lock_initiator={}",
tablet->tablet_id(), stop_token->initiator());
}
return Status::OK();
}
Status CloudStorageEngine::_check_all_root_path_cluster_id() {
// Check if all root paths have the same cluster id
std::set<int32_t> cluster_ids;
for (const auto& path : _options.store_paths) {
auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX);
bool exists = false;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
if (exists) {
io::FileReaderSPtr reader;
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path, &reader));
size_t fsize = reader->size();
if (fsize > 0) {
std::string content;
content.resize(fsize, '\0');
size_t bytes_read = 0;
RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize}, &bytes_read));
DCHECK_EQ(fsize, bytes_read);
int32_t tmp_cluster_id = std::stoi(content);
cluster_ids.insert(tmp_cluster_id);
}
}
}
_effective_cluster_id = config::cluster_id;
// first init
if (cluster_ids.empty()) {
// not set configured cluster id
if (_effective_cluster_id == -1) {
return Status::OK();
} else {
// If no cluster id file exists, use the configured cluster id
return set_cluster_id(_effective_cluster_id);
}
}
if (cluster_ids.size() > 1) {
return Status::InternalError(
"All root paths must have the same cluster id, but you have "
"different cluster ids: {}",
fmt::join(cluster_ids, ", "));
}
if (_effective_cluster_id != -1 && !cluster_ids.empty() &&
*cluster_ids.begin() != _effective_cluster_id) {
return Status::Corruption(
"multiple cluster ids is not equal. config::cluster_id={}, "
"storage path cluster_id={}",
_effective_cluster_id, *cluster_ids.begin());
}
return Status::OK();
}
Status CloudStorageEngine::set_cluster_id(int32_t cluster_id) {
std::lock_guard<std::mutex> l(_store_lock);
for (auto& path : _options.store_paths) {
auto cluster_id_path = fmt::format("{}/{}", path.path, CLUSTER_ID_PREFIX);
bool exists = false;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
if (!exists) {
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(
io::global_local_filesystem()->create_file(cluster_id_path, &file_writer));
RETURN_IF_ERROR(file_writer->append(std::to_string(cluster_id)));
RETURN_IF_ERROR(file_writer->close());
}
}
_effective_cluster_id = cluster_id;
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris