| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "recycler/snapshot_chain_compactor.h" |
| |
| #include <gen_cpp/cloud.pb.h> |
| |
| #include "common/config.h" |
| #include "common/stopwatch.h" |
| #include "common/util.h" |
| #include "meta-store/keys.h" |
| #include "mock_accessor.h" |
| #include "recycler/hdfs_accessor.h" |
| #include "recycler/s3_accessor.h" |
| #include "recycler/util.h" |
| |
| namespace doris::cloud { |
| |
| SnapshotChainCompactor::SnapshotChainCompactor(std::shared_ptr<TxnKv> txn_kv) |
| : txn_kv_(std::move(txn_kv)) {} |
| |
| SnapshotChainCompactor::~SnapshotChainCompactor() { |
| if (!stopped()) { |
| stop(); |
| } |
| } |
| |
| int SnapshotChainCompactor::start() { |
| workers_.emplace_back([this]() { scan_instance_loop(); }); |
| workers_.emplace_back([this] { lease_compaction_jobs(); }); |
| for (int i = 0; i < config::snapshot_chain_compactor_concurrent; ++i) { |
| workers_.emplace_back([this]() { compaction_loop(); }); |
| } |
| |
| LOG_INFO("snapshot chain compactor started") |
| .tag("concurrent", config::snapshot_chain_compactor_concurrent); |
| |
| return 0; |
| } |
| |
| void SnapshotChainCompactor::stop() { |
| stopped_ = true; |
| notifier_.notify_all(); |
| pending_instance_cond_.notify_all(); |
| { |
| std::lock_guard lock(mtx_); |
| for (auto& [_, compactor] : compacting_instance_map_) { |
| compactor->stop(); |
| } |
| } |
| for (auto& w : workers_) { |
| if (w.joinable()) w.join(); |
| } |
| } |
| |
| void SnapshotChainCompactor::compaction_loop() { |
| pthread_setname_np(pthread_self(), "SNAP_COMPACTOR"); |
| while (!stopped()) { |
| // fetch instance to check |
| InstanceInfoPB instance; |
| { |
| std::unique_lock lock(mtx_); |
| pending_instance_cond_.wait( |
| lock, [&]() -> bool { return !pending_instance_queue_.empty() || stopped(); }); |
| if (stopped()) { |
| return; |
| } |
| instance = std::move(pending_instance_queue_.front()); |
| pending_instance_queue_.pop_front(); |
| pending_instance_set_.erase(instance.instance_id()); |
| } |
| const auto& instance_id = instance.instance_id(); |
| { |
| std::lock_guard lock(mtx_); |
| // skip instance in compacting |
| if (compacting_instance_map_.count(instance_id)) continue; |
| } |
| |
| auto compactor = std::make_shared<InstanceChainCompactor>(txn_kv_, instance); |
| if (compactor->init() != 0) { |
| LOG(WARNING) << "failed to init instance compactor, instance_id=" |
| << instance.instance_id(); |
| continue; |
| } |
| |
| std::string job_key = job_snapshot_chain_compactor_key(instance.instance_id()); |
| int ret = |
| prepare_instance_recycle_job(txn_kv_.get(), job_key, instance.instance_id(), |
| ip_port_, config::recycle_job_lease_expired_ms * 1000); |
| if (ret != 0) { // Prepare failed |
| continue; |
| } else { |
| std::lock_guard lock(mtx_); |
| compacting_instance_map_.emplace(instance_id, compactor); |
| } |
| if (stopped()) return; |
| |
| compactor->do_compact(); |
| { |
| std::lock_guard lock(mtx_); |
| compacting_instance_map_.erase(instance.instance_id()); |
| } |
| } |
| } |
| |
| void SnapshotChainCompactor::scan_instance_loop() { |
| std::this_thread::sleep_for( |
| std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds)); |
| while (!stopped()) { |
| std::vector<InstanceInfoPB> instances; |
| get_all_instances(txn_kv_.get(), instances); |
| LOG(INFO) << "Snapshot chain compactor get instances: " << [&instances] { |
| std::stringstream ss; |
| for (auto& i : instances) ss << ' ' << i.instance_id(); |
| return ss.str(); |
| }(); |
| if (!instances.empty()) { |
| // enqueue instances |
| std::lock_guard lock(mtx_); |
| for (auto& instance : instances) { |
| if (instance.status() == InstanceInfoPB::DELETED) continue; |
| if (!is_snapshot_chain_need_compact(instance)) continue; |
| auto [_, success] = pending_instance_set_.insert(instance.instance_id()); |
| // skip instance already in pending queue |
| if (success) { |
| pending_instance_queue_.push_back(std::move(instance)); |
| } |
| } |
| pending_instance_cond_.notify_all(); |
| } |
| { |
| std::unique_lock lock(mtx_); |
| notifier_.wait_for(lock, std::chrono::seconds(config::recycle_interval_seconds), |
| [&]() { return stopped(); }); |
| } |
| } |
| } |
| |
| void SnapshotChainCompactor::lease_compaction_jobs() { |
| while (!stopped()) { |
| std::vector<std::string> instances; |
| instances.reserve(compacting_instance_map_.size()); |
| { |
| std::lock_guard lock(mtx_); |
| for (auto& [id, _] : compacting_instance_map_) { |
| instances.push_back(id); |
| } |
| } |
| for (auto& i : instances) { |
| std::string job_key = job_snapshot_chain_compactor_key(i); |
| int ret = lease_instance_recycle_job(txn_kv_.get(), job_key, i, ip_port_); |
| if (ret == 1) { |
| std::lock_guard lock(mtx_); |
| if (auto it = compacting_instance_map_.find(i); |
| it != compacting_instance_map_.end()) { |
| it->second->stop(); |
| } |
| } |
| } |
| { |
| std::unique_lock lock(mtx_); |
| notifier_.wait_for(lock, |
| std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3), |
| [&]() { return stopped(); }); |
| } |
| } |
| } |
| |
| bool is_instance_cloned_from_snapshot(const InstanceInfoPB& instance_info) { |
| return instance_info.has_source_instance_id() && !instance_info.source_instance_id().empty() && |
| instance_info.has_source_snapshot_id() && !instance_info.source_snapshot_id().empty(); |
| } |
| |
| int get_instance_info(TxnKv* txn_kv, const std::string& instance_id, |
| InstanceInfoPB& instance_info) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to create txn"; |
| return -1; |
| } |
| |
| std::string key = instance_key({instance_id}); |
| std::string val; |
| err = txn->get(key, &val); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| return -1; |
| } |
| if (!instance_info.ParseFromString(val)) { |
| LOG(WARNING) << "failed to parse InstanceInfoPB, instance_id=" << instance_id; |
| return -2; |
| } |
| return 0; |
| } |
| |
| int is_instance_cloned(TxnKv* txn_kv, const std::string& instance_id, bool* is_cloned) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to create txn"; |
| return -1; |
| } |
| |
| std::string snapshot_ref_key_start = versioned::snapshot_reference_key_prefix(instance_id); |
| std::string snapshot_ref_key_end = snapshot_ref_key_start + '\xFF'; |
| std::unique_ptr<RangeGetIterator> it; |
| err = txn->get(snapshot_ref_key_start, snapshot_ref_key_end, &it, false, 1); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to get snapshot reference key. instance_id=" << instance_id |
| << ", err=" << err; |
| return -1; |
| } |
| *is_cloned = it->has_next(); |
| return 0; |
| } |
| |
| bool SnapshotChainCompactor::is_snapshot_chain_need_compact(const InstanceInfoPB& instance_info) { |
| // compact the instance which meets the following conditions: |
| // 1. the instance is cloned from snapshot |
| // 2. its source instance is not cloned from other snapshots |
| // 3. the instance is cloned by other instances. |
| // |
| // for example, if the clone chain is [instance1 -> instance2 -> instance3], compact the instance2 |
| if (!is_instance_cloned_from_snapshot(instance_info)) { |
| return false; |
| } |
| |
| InstanceInfoPB source_instance_info; |
| if (get_instance_info(txn_kv_.get(), instance_info.source_instance_id(), |
| source_instance_info) != 0) { |
| LOG(WARNING) << "failed to get source instance info, instance_id=" |
| << instance_info.source_instance_id(); |
| return false; |
| } |
| if (is_instance_cloned_from_snapshot(source_instance_info)) { |
| return false; |
| } |
| |
| bool is_cloned = false; |
| if (is_instance_cloned(txn_kv_.get(), instance_info.instance_id(), &is_cloned) != 0) { |
| LOG(WARNING) << "failed to check is instance cloned, instance_id=" |
| << instance_info.instance_id(); |
| return false; |
| } |
| if (is_cloned) { |
| return true; |
| } |
| return false; |
| } |
| |
| InstanceChainCompactor::InstanceChainCompactor(std::shared_ptr<TxnKv> txn_kv, |
| const InstanceInfoPB& instance) |
| : txn_kv_(std::move(txn_kv)), |
| instance_id_(instance.instance_id()), |
| instance_info_(instance) {} |
| |
| InstanceChainCompactor::~InstanceChainCompactor() { |
| if (!stopped()) { |
| stop(); |
| } |
| } |
| |
| int InstanceChainCompactor::init() { |
| int ret = init_obj_store_accessors(); |
| if (ret != 0) { |
| return ret; |
| } |
| |
| return init_storage_vault_accessors(); |
| } |
| |
| int InstanceChainCompactor::init_obj_store_accessors() { |
| for (const auto& obj_info : instance_info_.obj_info()) { |
| #ifdef UNIT_TEST |
| auto accessor = std::make_shared<MockAccessor>(); |
| #else |
| auto s3_conf = S3Conf::from_obj_store_info(obj_info); |
| if (!s3_conf) { |
| LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_; |
| return -1; |
| } |
| |
| std::shared_ptr<S3Accessor> accessor; |
| int ret = S3Accessor::create(std::move(*s3_conf), &accessor); |
| if (ret != 0) { |
| LOG(WARNING) << "failed to init object accessor. instance_id=" << instance_id_ |
| << " resource_id=" << obj_info.id(); |
| return ret; |
| } |
| #endif |
| |
| accessor_map_.emplace(obj_info.id(), std::move(accessor)); |
| } |
| |
| return 0; |
| } |
| |
| int InstanceChainCompactor::init_storage_vault_accessors() { |
| if (instance_info_.resource_ids().empty()) { |
| return 0; |
| } |
| |
| FullRangeGetOptions opts(txn_kv_); |
| opts.prefetch = true; |
| auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}), |
| storage_vault_key({instance_id_, "\xff"}), std::move(opts)); |
| |
| for (auto kv = it->next(); kv.has_value(); kv = it->next()) { |
| auto [k, v] = *kv; |
| StorageVaultPB vault; |
| if (!vault.ParseFromArray(v.data(), v.size())) { |
| LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k); |
| return -1; |
| } |
| TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault", |
| &accessor_map_, &vault); |
| if (vault.has_hdfs_info()) { |
| #ifdef ENABLE_HDFS_STORAGE_VAULT |
| auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info()); |
| int ret = accessor->init(); |
| if (ret != 0) { |
| LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_ |
| << " resource_id=" << vault.id() << " name=" << vault.name(); |
| return ret; |
| } |
| |
| accessor_map_.emplace(vault.id(), std::move(accessor)); |
| #else |
| LOG(ERROR) << "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT build option), " |
| << "but HDFS storage vaults were detected"; |
| #endif |
| } else if (vault.has_obj_info()) { |
| #ifdef UNIT_TEST |
| auto accessor = std::make_shared<MockAccessor>(); |
| #else |
| auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info()); |
| if (!s3_conf) { |
| LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_; |
| return -1; |
| } |
| |
| std::shared_ptr<S3Accessor> accessor; |
| int ret = S3Accessor::create(std::move(*s3_conf), &accessor); |
| if (ret != 0) { |
| LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_ |
| << " resource_id=" << vault.id() << " name=" << vault.name(); |
| return ret; |
| } |
| #endif |
| |
| accessor_map_.emplace(vault.id(), std::move(accessor)); |
| } |
| } |
| |
| if (!it->is_valid()) { |
| LOG_WARNING("failed to get storage vault kv"); |
| return -1; |
| } |
| return 0; |
| } |
| |
| int InstanceChainCompactor::do_compact() { |
| LOG_WARNING("start snapshot chain compaction").tag("instance_id", instance_id_); |
| |
| StopWatch stop_watch; |
| DORIS_CLOUD_DEFER { |
| LOG_WARNING("snapshot chain compaction done") |
| .tag("instance_id", instance_id_) |
| .tag("cost(sec)", stop_watch.elapsed_seconds()); |
| }; |
| |
| SnapshotManager snapshot_mgr(txn_kv_); |
| int res = snapshot_mgr.compact_snapshot_chains(this); |
| if (res != 0) { |
| LOG_WARNING("failed to compact snapshot chains").tag("instance_id", instance_id_); |
| return res; |
| } |
| |
| return handle_compaction_completion(); |
| } |
| |
| int InstanceChainCompactor::handle_compaction_completion() { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to create txn in chain compaction").tag("instance_id", instance_id_); |
| return -1; |
| } |
| |
| std::string key = instance_key(instance_id_); |
| std::string instance_value; |
| err = txn->get(key, &instance_value); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to get instance info in chain compaction") |
| .tag("instance_id", instance_id_) |
| .tag("error", err); |
| return -1; |
| } |
| |
| InstanceInfoPB instance_info; |
| if (!instance_info.ParseFromString(instance_value)) { |
| LOG_WARNING("failed to parse instance info in chain compaction") |
| .tag("instance_id", instance_id_); |
| return -1; |
| } |
| |
| Versionstamp snapshot_versionstamp; |
| if (!SnapshotManager::parse_snapshot_versionstamp(instance_info.source_snapshot_id(), |
| &snapshot_versionstamp)) { |
| LOG_WARNING("failed to parse snapshot_id to versionstamp in chain compaction") |
| .tag("instance_id", instance_id_) |
| .tag("source_instance_id", instance_info.source_instance_id()) |
| .tag("snapshot_id", instance_info.source_snapshot_id()); |
| return -1; |
| } |
| auto source_instance_id = instance_info.source_instance_id(); |
| auto snapshot_id = instance_info.source_snapshot_id(); |
| versioned::SnapshotReferenceKeyInfo ref_key_info {source_instance_id, snapshot_versionstamp, |
| instance_id_}; |
| std::string reference_key = versioned::snapshot_reference_key(ref_key_info); |
| txn->remove(reference_key); |
| |
| // instance_info.clear_source_instance_id(); |
| instance_info.clear_source_snapshot_id(); |
| instance_info.clear_compacted_key_sets(); |
| txn->atomic_add(system_meta_service_instance_update_key(), 1); |
| txn->put(key, instance_info.SerializeAsString()); |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to commit instance info in chain compaction") |
| .tag("instance_id", instance_id_) |
| .tag("source_instance_id", source_instance_id) |
| .tag("snapshot_id", snapshot_id) |
| .tag("error", err); |
| return -1; |
| } |
| |
| LOG_INFO("finish chain compaction") |
| .tag("instance_id", instance_id_) |
| .tag("source_instance_id", source_instance_id) |
| .tag("snapshot_id", snapshot_id); |
| return 0; |
| } |
| |
| } // namespace doris::cloud |