| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "snapshot/snapshot_manager.h" |
| |
| #include <fmt/format.h> |
| |
| #include "common/logging.h" |
| #include "meta-store/keys.h" |
| #include "meta-store/versionstamp.h" |
| #include "recycler/checker.h" |
| #include "recycler/recycler.h" |
| |
| namespace doris::cloud { |
| |
| bool SnapshotManager::parse_snapshot_versionstamp(std::string_view snapshot_id, |
| Versionstamp* versionstamp) { |
| if (snapshot_id.size() != 20) { |
| return false; |
| } |
| |
| std::array<uint8_t, 10> versionstamp_data; |
| for (size_t i = 0; i < 10; ++i) { |
| const char* hex_chars = snapshot_id.data() + (i * 2); |
| |
| // Convert two hex digits to one byte more efficiently |
| uint8_t high_nibble = 0, low_nibble = 0; |
| |
| // Parse high nibble |
| if (hex_chars[0] >= '0' && hex_chars[0] <= '9') { |
| high_nibble = hex_chars[0] - '0'; |
| } else if (hex_chars[0] >= 'a' && hex_chars[0] <= 'f') { |
| high_nibble = hex_chars[0] - 'a' + 10; |
| } else if (hex_chars[0] >= 'A' && hex_chars[0] <= 'F') { |
| high_nibble = hex_chars[0] - 'A' + 10; |
| } else { |
| return false; |
| } |
| |
| // Parse low nibble |
| if (hex_chars[1] >= '0' && hex_chars[1] <= '9') { |
| low_nibble = hex_chars[1] - '0'; |
| } else if (hex_chars[1] >= 'a' && hex_chars[1] <= 'f') { |
| low_nibble = hex_chars[1] - 'a' + 10; |
| } else if (hex_chars[1] >= 'A' && hex_chars[1] <= 'F') { |
| low_nibble = hex_chars[1] - 'A' + 10; |
| } else { |
| return false; |
| } |
| |
| versionstamp_data[i] = (high_nibble << 4) | low_nibble; |
| } |
| |
| *versionstamp = Versionstamp(versionstamp_data); |
| return true; |
| } |
| |
| std::string SnapshotManager::serialize_snapshot_id(Versionstamp snapshot_versionstamp) { |
| return snapshot_versionstamp.to_string(); |
| } |
| |
| void SnapshotManager::begin_snapshot(std::string_view instance_id, |
| const BeginSnapshotRequest& request, |
| BeginSnapshotResponse* response) { |
| response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR); |
| response->mutable_status()->set_msg("Not implemented"); |
| } |
| |
| void SnapshotManager::update_snapshot(std::string_view instance_id, |
| const UpdateSnapshotRequest& request, |
| UpdateSnapshotResponse* response) { |
| response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR); |
| response->mutable_status()->set_msg("Not implemented"); |
| } |
| |
| void SnapshotManager::commit_snapshot(std::string_view instance_id, |
| const CommitSnapshotRequest& request, |
| CommitSnapshotResponse* response) { |
| response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR); |
| response->mutable_status()->set_msg("Not implemented"); |
| } |
| |
| void SnapshotManager::abort_snapshot(std::string_view instance_id, |
| const AbortSnapshotRequest& request, |
| AbortSnapshotResponse* response) { |
| response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR); |
| response->mutable_status()->set_msg("Not implemented"); |
| } |
| |
| void SnapshotManager::drop_snapshot(std::string_view instance_id, |
| const DropSnapshotRequest& request, |
| DropSnapshotResponse* response) { |
| response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR); |
| response->mutable_status()->set_msg("Not implemented"); |
| } |
| |
| void SnapshotManager::list_snapshot(std::string_view instance_id, |
| const ListSnapshotRequest& request, |
| ListSnapshotResponse* response) { |
| response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR); |
| response->mutable_status()->set_msg("Not implemented"); |
| } |
| |
| void SnapshotManager::clone_instance(const CloneInstanceRequest& request, |
| CloneInstanceResponse* response) { |
| response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR); |
| response->mutable_status()->set_msg("Not implemented"); |
| } |
| |
| std::pair<MetaServiceCode, std::string> SnapshotManager::compact_snapshot( |
| std::string_view instance_id) { |
| return {MetaServiceCode::UNDEFINED_ERR, "Not implemented"}; |
| } |
| |
| std::pair<MetaServiceCode, std::string> SnapshotManager::decouple_instance(std::string_view id) { |
| std::string instance_id(id); |
| LOG_INFO("decouple_instance").tag("instance_id", instance_id); |
| |
| // 1. Create transaction and get current instance info |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| return {MetaServiceCode::KV_TXN_CREATE_ERR, "failed to create txn"}; |
| } |
| |
| std::string key = instance_key({instance_id}); |
| std::string value; |
| err = txn->get(key, &value); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| return {MetaServiceCode::CLUSTER_NOT_FOUND, |
| fmt::format("instance not found, instance_id={}", instance_id)}; |
| } else if (err != TxnErrorCode::TXN_OK) { |
| return {MetaServiceCode::KV_TXN_GET_ERR, |
| fmt::format("failed to get instance info, instance_id={}, err={}", instance_id, |
| err)}; |
| } |
| |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(value)) { |
| return {MetaServiceCode::PROTOBUF_PARSE_ERR, "failed to parse instance info"}; |
| } |
| |
| // 2. Check the instance was created via clone_instance |
| if (!instance.has_source_instance_id() || instance.source_instance_id().empty() || |
| !instance.has_source_snapshot_id() || instance.source_snapshot_id().empty()) { |
| return {MetaServiceCode::INVALID_ARGUMENT, |
| fmt::format("instance {} was not a cloned instance (created via clone_instance)", |
| instance_id)}; |
| } |
| |
| // 3. Check snapshot_compact_status is SNAPSHOT_COMPACT_DONE |
| if (instance.snapshot_compact_status() != SnapshotCompactStatus::SNAPSHOT_COMPACT_DONE) { |
| return {MetaServiceCode::INVALID_ARGUMENT, |
| fmt::format("instance {} snapshot_compact_status is not SNAPSHOT_COMPACT_DONE, " |
| "current status={}", |
| instance_id, |
| SnapshotCompactStatus_Name(instance.snapshot_compact_status()))}; |
| } |
| |
| // 4. Remove the snapshot reference key in the source instance |
| const std::string& source_instance_id = instance.source_instance_id(); |
| const std::string& source_snapshot_id = instance.source_snapshot_id(); |
| |
| Versionstamp snapshot_versionstamp; |
| if (!parse_snapshot_versionstamp(source_snapshot_id, &snapshot_versionstamp)) { |
| return {MetaServiceCode::UNDEFINED_ERR, |
| fmt::format("failed to parse source_snapshot_id={} to versionstamp", |
| source_snapshot_id)}; |
| } |
| |
| versioned::SnapshotReferenceKeyInfo ref_key_info {source_instance_id, snapshot_versionstamp, |
| instance_id}; |
| std::string reference_key = versioned::snapshot_reference_key(ref_key_info); |
| txn->remove(reference_key); |
| |
| // 5. Clear source_snapshot_id and source_instance_id from the instance PB |
| instance.clear_source_snapshot_id(); |
| instance.clear_source_instance_id(); |
| |
| // 6. Persist the updated instance |
| std::string updated_val; |
| if (!instance.SerializeToString(&updated_val)) { |
| return {MetaServiceCode::PROTOBUF_SERIALIZE_ERR, |
| fmt::format("failed to serialize updated instance, instance_id={}", instance_id)}; |
| } |
| |
| txn->atomic_add(system_meta_service_instance_update_key(), 1); |
| txn->put(key, updated_val); |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| return {MetaServiceCode::KV_TXN_COMMIT_ERR, |
| fmt::format("failed to commit txn, instance_id={}, err={}", instance_id, err)}; |
| } |
| |
| LOG_INFO("decouple_instance completed successfully") |
| .tag("instance_id", instance_id) |
| .tag("source_instance_id", source_instance_id) |
| .tag("source_snapshot_id", source_snapshot_id); |
| |
| return {MetaServiceCode::OK, ""}; |
| } |
| |
| std::pair<MetaServiceCode, std::string> SnapshotManager::set_multi_version_status( |
| std::string_view instance_id, MultiVersionStatus multi_version_status) { |
| return {MetaServiceCode::UNDEFINED_ERR, "Not implemented"}; |
| } |
| |
| int SnapshotManager::recycle_snapshots(InstanceRecycler* recycler) { |
| return 0; |
| } |
| |
| int SnapshotManager::check_snapshots(InstanceChecker* checker) { |
| return 0; |
| } |
| |
| int SnapshotManager::inverted_check_snapshots(InstanceChecker* checker) { |
| return 0; |
| } |
| |
| int SnapshotManager::check_mvcc_meta_key(InstanceChecker* checker) { |
| return 0; |
| } |
| |
| int SnapshotManager::inverted_check_mvcc_meta_key(InstanceChecker* checker) { |
| return 0; |
| } |
| |
| int SnapshotManager::check_meta(MetaChecker* meta_checker) { |
| return 0; |
| } |
| |
| int SnapshotManager::recycle_snapshot_meta_and_data(std::string_view instance_id, |
| std::string_view resource_id, |
| StorageVaultAccessor* accessor, |
| Versionstamp snapshot_version, |
| const SnapshotPB& snapshot_pb) { |
| return 0; |
| } |
| |
| int SnapshotManager::migrate_to_versioned_keys(InstanceDataMigrator* migrator) { |
| LOG(WARNING) << "Migrate to versioned keys is not implemented"; |
| return -1; |
| } |
| |
| int SnapshotManager::compact_snapshot_chains(InstanceChainCompactor* compactor) { |
| LOG(WARNING) << "Compact snapshot chains is not implemented"; |
| return -1; |
| } |
| |
| } // namespace doris::cloud |