| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <brpc/builtin_service.pb.h> |
| #include <brpc/server.h> |
| #include <butil/endpoint.h> |
| #include <butil/strings/string_split.h> |
| #include <bvar/status.h> |
| #include <gen_cpp/cloud.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <cstdint> |
| #include <string> |
| #include <string_view> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/defer.h" |
| #include "common/encryption_util.h" |
| #include "common/logging.h" |
| #include "common/stopwatch.h" |
| #include "common/util.h" |
| #include "meta-service/meta_service.h" |
| #include "meta-service/meta_service_schema.h" |
| #include "meta-store/blob_message.h" |
| #include "meta-store/document_message.h" |
| #include "meta-store/keys.h" |
| #include "meta-store/meta_reader.h" |
| #include "meta-store/txn_kv.h" |
| #include "meta-store/txn_kv_error.h" |
| #include "meta-store/versioned_value.h" |
| #include "meta-store/versionstamp.h" |
| #include "recycler/checker.h" |
| #include "recycler/recycler.h" |
| #include "recycler/util.h" |
| |
| #define RETURN_ON_FAILURE(stmt) \ |
| do { \ |
| int res = (stmt); \ |
| if (res != 0) { \ |
| return res; \ |
| } \ |
| } while (0) |
| |
| namespace doris::cloud { |
| |
| using namespace std::chrono; |
| |
| int OperationLogRecycleChecker::init() { |
| source_snapshot_versionstamp_ = Versionstamp::min(); |
| if (instance_info_.has_source_snapshot_id() && |
| !SnapshotManager::parse_snapshot_versionstamp(instance_info_.source_snapshot_id(), |
| &source_snapshot_versionstamp_)) { |
| LOG_WARNING("failed to parse versionstamp from source snapshot id") |
| .tag("source_snapshot_id", instance_info_.source_snapshot_id()); |
| return -1; |
| } |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to create txn").tag("err", err); |
| return -1; |
| } |
| |
| snapshots_.clear(); |
| snapshot_indexes_.clear(); |
| MetaReader reader(instance_id_); |
| std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots; |
| err = reader.get_snapshots(txn.get(), &snapshots); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to get snapshots").tag("err", err); |
| return -1; |
| } |
| |
| int64_t read_version = -1; |
| err = txn->get_read_version(&read_version); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to get the read version").tag("err", err); |
| return -1; |
| } |
| |
| max_versionstamp_ = Versionstamp(read_version, 0); |
| for (size_t i = 0; i < snapshots.size(); ++i) { |
| auto&& [snapshot, versionstamp] = snapshots[i]; |
| if (snapshot.status() == SnapshotStatus::SNAPSHOT_ABORTED || |
| snapshot.status() == SnapshotStatus::SNAPSHOT_RECYCLED) { |
| continue; |
| } |
| snapshot_indexes_.insert(std::make_pair(versionstamp, snapshots_.size())); |
| snapshots_.push_back(std::make_pair(std::move(snapshot), versionstamp)); |
| } |
| |
| return 0; |
| } |
| |
| bool OperationLogRecycleChecker::can_recycle(const Versionstamp& log_versionstamp, |
| int64_t log_min_timestamp, |
| OperationLogReferenceInfo* reference_info) const { |
| Versionstamp log_min_read_timestamp(log_min_timestamp, 0); |
| if (log_versionstamp > max_versionstamp_) { |
| // Not recycleable. |
| return false; |
| } |
| |
| // Do not recycle operation logs referenced by active snapshots. |
| if (log_min_read_timestamp < source_snapshot_versionstamp_) { |
| reference_info->referenced_by_instance = true; |
| return false; |
| } |
| |
| auto it = snapshot_indexes_.lower_bound(log_min_read_timestamp); |
| if (it != snapshot_indexes_.end() && snapshots_[it->second].second < log_versionstamp) { |
| // in [log_min_read_timestmap, log_versionstamp) |
| reference_info->referenced_by_snapshot = true; |
| reference_info->referenced_snapshot_timestamp = snapshots_[it->second].second; |
| return false; |
| } |
| |
| return true; |
| } |
| |
| // A recycler for operation logs. |
| class OperationLogRecycler { |
| public: |
| OperationLogRecycler(std::string_view instance_id, TxnKv* txn_kv, Versionstamp log_version, |
| int64_t min_read_version, const std::vector<std::string>& raw_keys) |
| : instance_id_(instance_id), |
| txn_kv_(txn_kv), |
| log_version_(log_version), |
| min_read_versionstamp_(min_read_version), |
| raw_keys_(raw_keys) {} |
| OperationLogRecycler(const OperationLogRecycler&) = delete; |
| OperationLogRecycler& operator=(const OperationLogRecycler&) = delete; |
| |
| int begin(); |
| |
| int recycle_commit_partition_log(const CommitPartitionLogPB& commit_partition_log); |
| |
| int recycle_drop_partition_log(const DropPartitionLogPB& drop_partition_log); |
| |
| int recycle_commit_index_log(const CommitIndexLogPB& commit_index_log); |
| |
| int recycle_drop_index_log(const DropIndexLogPB& drop_index_log); |
| |
| int recycle_commit_txn_log(const CommitTxnLogPB& commit_txn_log); |
| |
| int recycle_update_tablet_log(const UpdateTabletLogPB& update_tablet_log); |
| |
| int recycle_compaction_log(const CompactionLogPB& compaction_log); |
| |
| int recycle_schema_change_log(const SchemaChangeLogPB& schema_change_log); |
| |
| int commit(); |
| |
| private: |
| int recycle_table_version(int64_t table_id); |
| int recycle_tablet_meta(int64_t tablet_id); |
| int recycle_tablet_load_stats(int64_t tablet_id); |
| int recycle_tablet_compact_stats(int64_t tablet_id); |
| int recycle_partition_version(int64_t partition_id); |
| int recycle_rowset_meta(int64_t tablet_id, int64_t end_version, const std::string& rowset_id); |
| |
| std::string_view instance_id_; |
| TxnKv* txn_kv_; |
| Versionstamp log_version_; |
| Versionstamp min_read_versionstamp_; |
| const std::vector<std::string>& raw_keys_; |
| |
| std::unique_ptr<Transaction> txn_; |
| }; |
| |
| int OperationLogRecycler::recycle_commit_partition_log( |
| const CommitPartitionLogPB& commit_partition_log) { |
| VLOG_DEBUG << "recycle commit partition log: " << commit_partition_log.ShortDebugString(); |
| |
| int64_t table_id = commit_partition_log.table_id(); |
| return recycle_table_version(table_id); |
| } |
| |
| int OperationLogRecycler::recycle_drop_partition_log(const DropPartitionLogPB& drop_partition_log) { |
| VLOG_DEBUG << "recycle drop partition log: " << drop_partition_log.ShortDebugString(); |
| |
| for (int64_t partition_id : drop_partition_log.partition_ids()) { |
| RecyclePartitionPB recycle_partition_pb; |
| recycle_partition_pb.set_db_id(drop_partition_log.db_id()); |
| recycle_partition_pb.set_table_id(drop_partition_log.table_id()); |
| *recycle_partition_pb.mutable_index_id() = drop_partition_log.index_ids(); |
| recycle_partition_pb.set_creation_time(::time(nullptr)); |
| recycle_partition_pb.set_expiration(drop_partition_log.expired_at_s()); |
| recycle_partition_pb.set_state(RecyclePartitionPB::DROPPED); |
| std::string recycle_partition_value; |
| if (!recycle_partition_pb.SerializeToString(&recycle_partition_value)) { |
| LOG_WARNING("failed to serialize RecyclePartitionPB").tag("partition_id", partition_id); |
| return -1; |
| } |
| std::string recycle_key = recycle_partition_key({instance_id_, partition_id}); |
| LOG_INFO("put recycle partition key") |
| .tag("recycle_key", hex(recycle_key)) |
| .tag("partition_id", partition_id); |
| txn_->put(recycle_key, recycle_partition_value); |
| } |
| |
| if (drop_partition_log.update_table_version()) { |
| return recycle_table_version(drop_partition_log.table_id()); |
| } |
| return 0; |
| } |
| |
| int OperationLogRecycler::recycle_commit_index_log(const CommitIndexLogPB& commit_index_log) { |
| VLOG_DEBUG << "recycle commit index log: " << commit_index_log.ShortDebugString(); |
| |
| if (commit_index_log.update_table_version()) { |
| int64_t table_id = commit_index_log.table_id(); |
| return recycle_table_version(table_id); |
| } |
| return 0; |
| } |
| |
| int OperationLogRecycler::recycle_drop_index_log(const DropIndexLogPB& drop_index_log) { |
| VLOG_DEBUG << "recycle drop index log: " << drop_index_log.ShortDebugString(); |
| |
| for (int64_t index_id : drop_index_log.index_ids()) { |
| RecycleIndexPB recycle_index_pb; |
| recycle_index_pb.set_db_id(drop_index_log.db_id()); |
| recycle_index_pb.set_table_id(drop_index_log.table_id()); |
| recycle_index_pb.set_creation_time(::time(nullptr)); |
| recycle_index_pb.set_expiration(drop_index_log.expiration()); |
| recycle_index_pb.set_state(RecycleIndexPB::DROPPED); |
| std::string recycle_index_value; |
| if (!recycle_index_pb.SerializeToString(&recycle_index_value)) { |
| LOG_WARNING("failed to serialize RecycleIndexPB").tag("index_id", index_id); |
| return -1; |
| } |
| std::string recycle_key = recycle_index_key({instance_id_, index_id}); |
| LOG_INFO("put recycle index key") |
| .tag("recycle_key", hex(recycle_key)) |
| .tag("index_id", index_id); |
| txn_->put(recycle_key, recycle_index_value); |
| } |
| return 0; |
| } |
| |
| int OperationLogRecycler::recycle_commit_txn_log(const CommitTxnLogPB& commit_txn_log) { |
| VLOG_DEBUG << "recycle commit txn log: " << commit_txn_log.ShortDebugString(); |
| |
| MetaReader meta_reader(instance_id_, log_version_); |
| |
| int64_t txn_id = commit_txn_log.txn_id(); |
| AnnotateTag txn_tag("txn_id", txn_id); |
| for (const auto& [partition_id, _] : commit_txn_log.partition_version_map()) { |
| RETURN_ON_FAILURE(recycle_partition_version(partition_id)); |
| } |
| |
| for (const auto& [tablet_id, _] : commit_txn_log.tablet_to_partition_map()) { |
| RETURN_ON_FAILURE(recycle_tablet_load_stats(tablet_id)); |
| } |
| |
| for (int64_t table_id : commit_txn_log.table_ids()) { |
| RETURN_ON_FAILURE(recycle_table_version(table_id)); |
| } |
| |
| int64_t db_id = commit_txn_log.db_id(); |
| std::string recycle_val; |
| std::string recycle_key = recycle_txn_key({instance_id_, db_id, txn_id}); |
| RecycleTxnPB recycle_pb; |
| auto now_time = system_clock::now(); |
| uint64_t visible_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count(); |
| recycle_pb.set_creation_time(visible_time); |
| recycle_pb.set_label(commit_txn_log.recycle_txn().label()); |
| |
| if (!recycle_pb.SerializeToString(&recycle_val)) { |
| LOG_ERROR("Failed to serialize RecycleTxnPB").tag("db_id", db_id); |
| return -1; |
| } |
| |
| LOG_INFO("put recycle txn key").tag("recycle_key", hex(recycle_key)).tag("db_id", db_id); |
| txn_->put(recycle_key, recycle_val); |
| return 0; |
| } |
| |
| int OperationLogRecycler::recycle_update_tablet_log(const UpdateTabletLogPB& update_tablet_log) { |
| VLOG_DEBUG << "recycle update tablet log: " << update_tablet_log.ShortDebugString(); |
| |
| MetaReader meta_reader(instance_id_, txn_kv_, log_version_); |
| for (int64_t tablet_id : update_tablet_log.tablet_ids()) { |
| RETURN_ON_FAILURE(recycle_tablet_meta(tablet_id)); |
| } |
| |
| return 0; |
| } |
| |
| int OperationLogRecycler::recycle_compaction_log(const CompactionLogPB& compaction_log) { |
| VLOG_DEBUG << "recycle compaction log: " << compaction_log.ShortDebugString(); |
| |
| MetaReader meta_reader(instance_id_, log_version_); |
| |
| int64_t tablet_id = compaction_log.tablet_id(); |
| RETURN_ON_FAILURE(recycle_tablet_compact_stats(tablet_id)); |
| |
| for (const RecycleRowsetPB& recycle_rowset_pb : compaction_log.recycle_rowsets()) { |
| // recycle rowset meta key |
| std::string recycle_rowset_value; |
| if (!recycle_rowset_pb.SerializeToString(&recycle_rowset_value)) { |
| LOG_WARNING("failed to serialize RecycleRowsetPB") |
| .tag("recycle rowset pb", recycle_rowset_pb.ShortDebugString()); |
| return -1; |
| } |
| std::string recycle_key = |
| recycle_rowset_key({instance_id_, compaction_log.tablet_id(), |
| recycle_rowset_pb.rowset_meta().rowset_id_v2()}); |
| // Put recycle rowset key to track recycled rowset metadata |
| LOG_INFO("put recycle rowset key") |
| .tag("recycle_key", hex(recycle_key)) |
| .tag("tablet_id", compaction_log.tablet_id()) |
| .tag("rowset_id_v2", recycle_rowset_pb.rowset_meta().rowset_id_v2()) |
| .tag("start_version", recycle_rowset_pb.rowset_meta().start_version()) |
| .tag("end_version", recycle_rowset_pb.rowset_meta().end_version()); |
| txn_->put(recycle_key, recycle_rowset_value); |
| |
| // Remove rowset compact key and rowset load key for input rowsets |
| RETURN_ON_FAILURE(recycle_rowset_meta(recycle_rowset_pb.rowset_meta().tablet_id(), |
| recycle_rowset_pb.rowset_meta().end_version(), |
| recycle_rowset_pb.rowset_meta().rowset_id_v2())); |
| } |
| return 0; |
| } |
| |
| int OperationLogRecycler::recycle_schema_change_log(const SchemaChangeLogPB& schema_change_log) { |
| VLOG_DEBUG << "recycle schema change log: " << schema_change_log.ShortDebugString(); |
| |
| MetaReader meta_reader(instance_id_, log_version_); |
| int64_t new_tablet_id = schema_change_log.new_tablet_id(); |
| RETURN_ON_FAILURE(recycle_tablet_meta(new_tablet_id)); |
| RETURN_ON_FAILURE(recycle_tablet_compact_stats(new_tablet_id)); |
| |
| for (const RecycleRowsetPB& recycle_rowset_pb : schema_change_log.recycle_rowsets()) { |
| // recycle rowset meta key |
| std::string recycle_rowset_value; |
| if (!recycle_rowset_pb.SerializeToString(&recycle_rowset_value)) { |
| LOG_WARNING("failed to serialize RecycleRowsetPB") |
| .tag("recycle rowset pb", recycle_rowset_pb.ShortDebugString()); |
| return -1; |
| } |
| std::string recycle_key = |
| recycle_rowset_key({instance_id_, schema_change_log.new_tablet_id(), |
| recycle_rowset_pb.rowset_meta().rowset_id_v2()}); |
| // Put recycle rowset key to track recycled rowset metadata |
| LOG_INFO("put recycle rowset key") |
| .tag("recycle_key", hex(recycle_key)) |
| .tag("new_tablet_id", schema_change_log.new_tablet_id()) |
| .tag("rowset_id_v2", recycle_rowset_pb.rowset_meta().rowset_id_v2()) |
| .tag("start_version", recycle_rowset_pb.rowset_meta().start_version()) |
| .tag("end_version", recycle_rowset_pb.rowset_meta().end_version()); |
| txn_->put(recycle_key, recycle_rowset_value); |
| |
| RETURN_ON_FAILURE(recycle_rowset_meta(recycle_rowset_pb.rowset_meta().tablet_id(), |
| recycle_rowset_pb.rowset_meta().end_version(), |
| recycle_rowset_pb.rowset_meta().rowset_id_v2())); |
| } |
| return 0; |
| } |
| |
| int OperationLogRecycler::begin() { |
| TxnErrorCode err = txn_kv_->create_txn(&txn_); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to create transaction for recycling operation log") |
| .tag("error_code", err); |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| int OperationLogRecycler::commit() { |
| // Remove the operation log entry itself after recycling its contents |
| LOG_INFO("remove operation log key") |
| .tag("log_version", log_version_.to_string()) |
| .tag("min_read_version", min_read_versionstamp_.version()); |
| for (const auto& raw_key : raw_keys_) { |
| txn_->remove(raw_key); |
| } |
| |
| TxnErrorCode err = txn_->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to remove operation log").tag("error_code", err); |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| int OperationLogRecycler::recycle_table_version(int64_t table_id) { |
| MetaReader meta_reader(instance_id_, log_version_); |
| Versionstamp prev_version; |
| TxnErrorCode err = meta_reader.get_table_version(txn_.get(), table_id, &prev_version); |
| if (err == TxnErrorCode::TXN_OK) { |
| if (prev_version < min_read_versionstamp_) { |
| LOG_FATAL("table version is less than min read versionstamp") |
| .tag("table_id", table_id) |
| .tag("prev_version", prev_version.to_string()) |
| .tag("min_read_version", min_read_versionstamp_.version()); |
| } |
| |
| std::string table_version_key = versioned::table_version_key({instance_id_, table_id}); |
| versioned_remove(txn_.get(), table_version_key, prev_version); |
| LOG_INFO("recycle table version") |
| .tag("table_id", table_id) |
| .tag("prev_version", prev_version.to_string()); |
| return 0; |
| } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| LOG_WARNING("failed to get table version for recycling operation log") |
| .tag("table_id", table_id) |
| .tag("error_code", err); |
| return -1; |
| } else { |
| VLOG_DEBUG << "No previous table version found for recycling" |
| << " table_id=" << table_id; |
| return 0; |
| } |
| } |
| |
| int OperationLogRecycler::recycle_tablet_meta(int64_t tablet_id) { |
| MetaReader meta_reader(instance_id_, log_version_); |
| TabletMetaCloudPB tablet_meta; |
| Versionstamp prev_version; |
| TxnErrorCode err = |
| meta_reader.get_tablet_meta(txn_.get(), tablet_id, &tablet_meta, &prev_version); |
| if (err == TxnErrorCode::TXN_OK) { |
| if (prev_version < min_read_versionstamp_) { |
| LOG_FATAL("tablet meta version is less than min read versionstamp") |
| .tag("tablet_id", tablet_id) |
| .tag("prev_version", prev_version.to_string()) |
| .tag("min_read_version", min_read_versionstamp_.version()); |
| } |
| std::string tablet_meta_key = versioned::meta_tablet_key({instance_id_, tablet_id}); |
| versioned_remove(txn_.get(), tablet_meta_key, prev_version); |
| LOG_INFO("recycle tablet meta") |
| .tag("tablet_id", tablet_id) |
| .tag("prev_version", prev_version.to_string()); |
| return 0; |
| } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| LOG_WARNING("failed to get tablet meta for recycling operation log") |
| .tag("tablet_id", tablet_id) |
| .tag("error_code", err); |
| return -1; |
| } else { |
| VLOG_DEBUG << "No previous tablet meta found for recycling" |
| << " tablet_id=" << tablet_id; |
| return 0; |
| } |
| } |
| |
| int OperationLogRecycler::recycle_tablet_load_stats(int64_t tablet_id) { |
| MetaReader meta_reader(instance_id_, log_version_); |
| Versionstamp prev_version; |
| TxnErrorCode err = |
| meta_reader.get_tablet_load_stats(txn_.get(), tablet_id, nullptr, &prev_version); |
| if (err == TxnErrorCode::TXN_OK) { |
| if (prev_version < min_read_versionstamp_) { |
| LOG_FATAL("tablet load stats version is less than min read versionstamp") |
| .tag("tablet_id", tablet_id) |
| .tag("prev_version", prev_version.to_string()) |
| .tag("min_read_version", min_read_versionstamp_.version()); |
| } |
| |
| std::string tablet_load_stats_key = |
| versioned::tablet_load_stats_key({instance_id_, tablet_id}); |
| versioned_remove(txn_.get(), tablet_load_stats_key, prev_version); |
| LOG_INFO("recycle tablet load stats") |
| .tag("tablet_id", tablet_id) |
| .tag("prev_version", prev_version.to_string()); |
| return 0; |
| } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| LOG_WARNING("failed to get tablet load stats for recycling operation log") |
| .tag("tablet_id", tablet_id) |
| .tag("error_code", err); |
| return -1; |
| } else { |
| VLOG_DEBUG << "No previous tablet load stats found for recycling" |
| << " tablet_id=" << tablet_id; |
| return 0; |
| } |
| } |
| |
| int OperationLogRecycler::recycle_tablet_compact_stats(int64_t tablet_id) { |
| MetaReader meta_reader(instance_id_, log_version_); |
| Versionstamp prev_version; |
| TxnErrorCode err = |
| meta_reader.get_tablet_compact_stats(txn_.get(), tablet_id, nullptr, &prev_version); |
| if (err == TxnErrorCode::TXN_OK) { |
| if (prev_version < min_read_versionstamp_) { |
| LOG_FATAL("tablet compact stats version is less than min read versionstamp") |
| .tag("tablet_id", tablet_id) |
| .tag("prev_version", prev_version.to_string()) |
| .tag("min_read_version", min_read_versionstamp_.version()); |
| } |
| std::string tablet_compact_stats_key = |
| versioned::tablet_compact_stats_key({instance_id_, tablet_id}); |
| versioned_remove(txn_.get(), tablet_compact_stats_key, prev_version); |
| LOG_INFO("recycle tablet compact stats") |
| .tag("tablet_id", tablet_id) |
| .tag("prev_version", prev_version.to_string()); |
| return 0; |
| } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| LOG_WARNING("failed to get tablet compact stats for recycling operation log") |
| .tag("tablet_id", tablet_id) |
| .tag("error_code", err); |
| return -1; |
| } else { |
| VLOG_DEBUG << "No previous tablet compact stats found for recycling" |
| << " tablet_id=" << tablet_id; |
| return 0; |
| } |
| } |
| |
| int OperationLogRecycler::recycle_partition_version(int64_t partition_id) { |
| MetaReader meta_reader(instance_id_, log_version_); |
| Versionstamp prev_version; |
| TxnErrorCode err = |
| meta_reader.get_partition_version(txn_.get(), partition_id, nullptr, &prev_version); |
| if (err == TxnErrorCode::TXN_OK) { |
| if (prev_version < min_read_versionstamp_) { |
| LOG_FATAL("partition version is less than min read versionstamp") |
| .tag("partition_id", partition_id) |
| .tag("prev_version", prev_version.to_string()) |
| .tag("min_read_version", min_read_versionstamp_.version()); |
| } |
| std::string partition_version_key = |
| versioned::partition_version_key({instance_id_, partition_id}); |
| versioned_remove(txn_.get(), partition_version_key, prev_version); |
| LOG_INFO("recycle partition version") |
| .tag("partition_id", partition_id) |
| .tag("prev_version", prev_version.to_string()); |
| return 0; |
| } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| LOG_WARNING("failed to get partition version for recycling operation log") |
| .tag("partition_id", partition_id) |
| .tag("error_code", err); |
| return -1; |
| } else { |
| VLOG_DEBUG << "No previous partition version found for recycling" |
| << " partition_id=" << partition_id; |
| return 0; |
| } |
| } |
| |
| int OperationLogRecycler::recycle_rowset_meta(int64_t tablet_id, int64_t end_version, |
| const std::string& rowset_id) { |
| // Remove rowset compact key and rowset load key for input rowsets |
| std::string meta_rowset_compact_key = |
| versioned::meta_rowset_compact_key({instance_id_, tablet_id, end_version}); |
| std::string meta_rowset_load_key = |
| versioned::meta_rowset_load_key({instance_id_, tablet_id, end_version}); |
| |
| RowsetMetaCloudPB rowset_meta; |
| Versionstamp version; |
| TxnErrorCode err = versioned::document_get(txn_.get(), meta_rowset_compact_key, log_version_, |
| &rowset_meta, &version); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| LOG_WARNING("Failed to get meta rowset compact key") |
| .tag("instance_id", instance_id_) |
| .tag("compact_key", hex(meta_rowset_compact_key)) |
| .tag("error", err); |
| return -1; |
| } else if (err == TxnErrorCode::TXN_OK && rowset_meta.rowset_id_v2() == rowset_id) { |
| // Remove meta rowset compact key for input rowset that was consumed in compaction |
| versioned::document_remove<doris::RowsetMetaCloudPB>(txn_.get(), meta_rowset_compact_key, |
| version); |
| LOG_INFO("remove meta rowset compact key") |
| .tag("instance_id", instance_id_) |
| .tag("compact_key", hex(meta_rowset_compact_key)) |
| .tag("tablet_id", tablet_id) |
| .tag("rowset_id", rowset_id) |
| .tag("start_version", rowset_meta.start_version()) |
| .tag("end_version", rowset_meta.end_version()); |
| return 0; |
| } else if (err = versioned::document_get(txn_.get(), meta_rowset_load_key, log_version_, |
| &rowset_meta, &version); |
| err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| LOG_WARNING("Failed to get meta rowset load key") |
| .tag("instance_id", instance_id_) |
| .tag("load_key", hex(meta_rowset_load_key)) |
| .tag("error", err); |
| return -1; |
| } else if (err == TxnErrorCode::TXN_OK && rowset_meta.rowset_id_v2() == rowset_id) { |
| // Remove meta rowset load key for input rowset that was consumed in compaction |
| versioned::document_remove<doris::RowsetMetaCloudPB>(txn_.get(), meta_rowset_load_key, |
| version); |
| LOG_INFO("remove meta rowset load key") |
| .tag("instance_id", instance_id_) |
| .tag("load_key", hex(meta_rowset_load_key)) |
| .tag("tablet_id", tablet_id) |
| .tag("rowset_id", rowset_id) |
| .tag("start_version", rowset_meta.start_version()) |
| .tag("end_version", rowset_meta.end_version()); |
| return 0; |
| } else { |
| VLOG_DEBUG << "No previous rowset meta found for recycling" |
| << " tablet_id=" << tablet_id << " end_version=" << end_version |
| << " rowset_id=" << rowset_id; |
| return 0; |
| } |
| } |
| |
| static TxnErrorCode get_txn_info(TxnKv* txn_kv, std::string_view instance_id, int64_t db_id, |
| int64_t txn_id, TxnInfoPB* txn_info) { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| |
| std::string key = txn_info_key({instance_id, db_id, txn_id}); |
| std::string txn_info_value; |
| err = txn->get(key, &txn_info_value); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to get txn info") |
| .tag("key", hex(key)) |
| .tag("db_id", db_id) |
| .tag("txn_id", txn_id) |
| .tag("error_code", err); |
| return err; |
| } |
| |
| if (!txn_info->ParseFromString(txn_info_value)) { |
| LOG_WARNING("failed to parse TxnInfoPB") |
| .tag("value_size", txn_info_value.size()) |
| .tag("key", hex(key)) |
| .tag("txn_id", txn_id); |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| |
| return TxnErrorCode::TXN_OK; |
| } |
| |
| int InstanceRecycler::recycle_operation_logs() { |
| if (!should_recycle_versioned_keys()) { |
| VLOG_DEBUG << "instance " << instance_id_ |
| << " is not need to recycle versioned keys, skip recycling operation logs. " |
| "multi version status: " |
| << MultiVersionStatus_Name(instance_info_.multi_version_status()) |
| << " snapshot switch status: " |
| << SnapshotSwitchStatus_Name(instance_info_.snapshot_switch_status()); |
| return 0; |
| } |
| |
| AnnotateTag tag("instance_id", instance_id_); |
| LOG_WARNING("begin to recycle operation logs"); |
| |
| StopWatch stop_watch; |
| size_t total_operation_logs = 0; |
| size_t recycled_operation_logs = 0; |
| size_t operation_log_data_size = 0; |
| size_t max_operation_log_data_size = 0; |
| size_t recycled_operation_log_data_size = 0; |
| |
| DORIS_CLOUD_DEFER { |
| int64_t cost = stop_watch.elapsed_us() / 1000'000; |
| LOG_WARNING("recycle operation logs, cost={}s", cost) |
| .tag("total_operation_logs", total_operation_logs) |
| .tag("recycled_operation_logs", recycled_operation_logs) |
| .tag("operation_log_data_size", operation_log_data_size) |
| .tag("max_operation_log_data_size", max_operation_log_data_size) |
| .tag("recycled_operation_log_data_size", recycled_operation_log_data_size); |
| }; |
| |
| OperationLogRecycleChecker recycle_checker(instance_id_, txn_kv_.get(), instance_info_); |
| int init_res = recycle_checker.init(); |
| if (init_res != 0) { |
| LOG_WARNING("failed to initialize recycle checker").tag("error_code", init_res); |
| return init_res; |
| } |
| SnapshotDataSizeCalculator calculator(instance_id_, txn_kv_); |
| calculator.init(recycle_checker.get_snapshots()); |
| |
| auto scan_and_recycle_operation_log = [&](const std::string_view& key, |
| const std::vector<std::string>& raw_keys, |
| OperationLogPB operation_log) { |
| std::string_view log_key(key); |
| Versionstamp log_versionstamp; |
| if (!decode_versioned_key(&log_key, &log_versionstamp)) { |
| LOG_WARNING("failed to decode versionstamp from operation log key") |
| .tag("key", hex(key)); |
| return -1; |
| } |
| |
| size_t value_size = operation_log.ByteSizeLong(); |
| OperationLogReferenceInfo reference_info; |
| if (recycle_checker.can_recycle(log_versionstamp, operation_log.min_timestamp(), |
| &reference_info)) { |
| AnnotateTag tag("log_key", hex(key)); |
| int res = recycle_operation_log(log_versionstamp, raw_keys, std::move(operation_log)); |
| if (res != 0) { |
| LOG_WARNING("failed to recycle operation log").tag("error_code", res); |
| return res; |
| } |
| |
| recycled_operation_logs++; |
| recycled_operation_log_data_size += value_size; |
| } else { |
| int res = calculator.calculate_operation_log_data_size(key, operation_log, |
| reference_info); |
| if (res != 0) { |
| LOG_WARNING("failed to calculate operation log data size").tag("error_code", res); |
| return res; |
| } |
| } |
| |
| total_operation_logs++; |
| operation_log_data_size += value_size; |
| max_operation_log_data_size = std::max(max_operation_log_data_size, value_size); |
| return 0; |
| }; |
| |
| auto is_multi_version_status_changed = [&]() { |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to create transaction for checking multi-version status") |
| .tag("error_code", err); |
| return -1; |
| } |
| |
| std::string value; |
| err = txn->get(instance_key({instance_id_}), &value); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to get instance info for checking multi-version status") |
| .tag("error_code", err); |
| return -1; |
| } |
| |
| InstanceInfoPB instance_info; |
| if (!instance_info.ParseFromString(value)) { |
| LOG_WARNING("failed to parse InstanceInfoPB").tag("value_size", value.size()); |
| return -1; |
| } |
| |
| if (!instance_info.has_multi_version_status() || |
| instance_info.multi_version_status() != instance_info_.multi_version_status()) { |
| LOG_WARNING("multi-version status changed for instance") |
| .tag("old_status", instance_info_.multi_version_status()) |
| .tag("new_status", instance_info.multi_version_status()); |
| return 1; // Indicate that the status has changed |
| } |
| return 0; |
| }; |
| |
| std::string log_key_prefix = versioned::log_key(instance_id_); |
| std::string begin_key = encode_versioned_key(log_key_prefix, Versionstamp::min()); |
| std::string end_key = encode_versioned_key(log_key_prefix, Versionstamp::max()); |
| |
| std::unique_ptr<BlobIterator> iter = blob_get_range(txn_kv_, begin_key, end_key); |
| for (size_t i = 0; iter->valid(); iter->next(), i++) { |
| std::string_view key = iter->key(); |
| OperationLogPB operation_log; |
| if (!iter->parse_value(&operation_log)) { |
| LOG_WARNING("failed to parse OperationLogPB from operation log key") |
| .tag("key", hex(key)); |
| return -1; |
| } |
| |
| int res = scan_and_recycle_operation_log(key, iter->raw_keys(), std::move(operation_log)); |
| if (res != 0) { |
| return res; |
| } |
| |
| if (i % 1000 == 0 && is_multi_version_status_changed() != 0) { |
| return -1; |
| } |
| } |
| if (iter->error_code() != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("error occurred during scanning operation logs") |
| .tag("error_code", iter->error_code()); |
| return -1; |
| } |
| int res = calculator.save_snapshot_data_size_with_retry(); |
| if (res != 0) { |
| LOG_WARNING("failed to save snapshot data size").tag("error_code", res); |
| return res; |
| } |
| return 0; |
| } |
| |
| int InstanceRecycler::recycle_operation_log(Versionstamp log_version, |
| const std::vector<std::string>& raw_keys, |
| OperationLogPB operation_log) { |
| int recycle_log_count = 0; |
| OperationLogRecycler log_recycler(instance_id_, txn_kv_.get(), log_version, |
| operation_log.min_timestamp(), raw_keys); |
| RETURN_ON_FAILURE(log_recycler.begin()); |
| |
| #define RECYCLE_OPERATION_LOG(log_type, method_name) \ |
| do { \ |
| if (operation_log.has_##log_type()) { \ |
| int res = log_recycler.method_name(operation_log.log_type()); \ |
| if (res != 0) { \ |
| LOG_WARNING("failed to recycle " #log_type " log") \ |
| .tag("res", res) \ |
| .tag("log_version", log_version.to_string()); \ |
| return res; \ |
| } \ |
| recycle_log_count++; \ |
| } \ |
| } while (0) |
| |
| RECYCLE_OPERATION_LOG(commit_partition, recycle_commit_partition_log); |
| RECYCLE_OPERATION_LOG(drop_partition, recycle_drop_partition_log); |
| RECYCLE_OPERATION_LOG(commit_index, recycle_commit_index_log); |
| RECYCLE_OPERATION_LOG(drop_index, recycle_drop_index_log); |
| RECYCLE_OPERATION_LOG(update_tablet, recycle_update_tablet_log); |
| RECYCLE_OPERATION_LOG(compaction, recycle_compaction_log); |
| RECYCLE_OPERATION_LOG(schema_change, recycle_schema_change_log); |
| #undef RECYCLE_OPERATION_LOG |
| |
| if (operation_log.has_commit_txn()) { |
| const CommitTxnLogPB& commit_txn_log = operation_log.commit_txn(); |
| TxnInfoPB txn_info; |
| TxnErrorCode err = get_txn_info(txn_kv_.get(), instance_id_, commit_txn_log.db_id(), |
| commit_txn_log.txn_id(), &txn_info); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG_WARNING("failed to get TxnInfoPB for recycling commit txn log") |
| .tag("txn_id", commit_txn_log.txn_id()) |
| .tag("error_code", err); |
| return -1; |
| } |
| |
| if (txn_info.status() != TxnStatusPB::TXN_STATUS_VISIBLE) { |
| VLOG_DEBUG << "TxnInfoPB state is not VISIBLE, skip recycling commit txn log" |
| << ", txn_id: " << commit_txn_log.txn_id() |
| << ", state: " << TxnStatusPB_Name(txn_info.status()); |
| return 0; |
| } |
| |
| int res = log_recycler.recycle_commit_txn_log(commit_txn_log); |
| if (res != 0) { |
| LOG_WARNING("failed to recycle commit_txn log") |
| .tag("res", res) |
| .tag("txn_id", commit_txn_log.txn_id()) |
| .tag("log_version", log_version.to_string()); |
| return res; |
| } |
| |
| recycle_log_count++; |
| } |
| |
| if (recycle_log_count > 1) { |
| LOG_FATAL("recycle operation log count is more than 1") |
| .tag("recycle_log_count", recycle_log_count) |
| .tag("log_version", log_version.to_string()) |
| .tag("operation_log", operation_log.ShortDebugString()); |
| return -1; // This is an unexpected condition, should not happen |
| } |
| |
| return log_recycler.commit(); |
| } |
| |
| } // namespace doris::cloud |