blob: ac2b9e22d442f1a647f6cc0edb1dcca984018bc6 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "snapshot/snapshot_manager.h"
#include <fmt/format.h>
#include "common/logging.h"
#include "meta-store/keys.h"
#include "meta-store/versionstamp.h"
#include "recycler/checker.h"
#include "recycler/recycler.h"
namespace doris::cloud {
bool SnapshotManager::parse_snapshot_versionstamp(std::string_view snapshot_id,
Versionstamp* versionstamp) {
if (snapshot_id.size() != 20) {
return false;
}
std::array<uint8_t, 10> versionstamp_data;
for (size_t i = 0; i < 10; ++i) {
const char* hex_chars = snapshot_id.data() + (i * 2);
// Convert two hex digits to one byte more efficiently
uint8_t high_nibble = 0, low_nibble = 0;
// Parse high nibble
if (hex_chars[0] >= '0' && hex_chars[0] <= '9') {
high_nibble = hex_chars[0] - '0';
} else if (hex_chars[0] >= 'a' && hex_chars[0] <= 'f') {
high_nibble = hex_chars[0] - 'a' + 10;
} else if (hex_chars[0] >= 'A' && hex_chars[0] <= 'F') {
high_nibble = hex_chars[0] - 'A' + 10;
} else {
return false;
}
// Parse low nibble
if (hex_chars[1] >= '0' && hex_chars[1] <= '9') {
low_nibble = hex_chars[1] - '0';
} else if (hex_chars[1] >= 'a' && hex_chars[1] <= 'f') {
low_nibble = hex_chars[1] - 'a' + 10;
} else if (hex_chars[1] >= 'A' && hex_chars[1] <= 'F') {
low_nibble = hex_chars[1] - 'A' + 10;
} else {
return false;
}
versionstamp_data[i] = (high_nibble << 4) | low_nibble;
}
*versionstamp = Versionstamp(versionstamp_data);
return true;
}
std::string SnapshotManager::serialize_snapshot_id(Versionstamp snapshot_versionstamp) {
return snapshot_versionstamp.to_string();
}
void SnapshotManager::begin_snapshot(std::string_view instance_id,
const BeginSnapshotRequest& request,
BeginSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::update_snapshot(std::string_view instance_id,
const UpdateSnapshotRequest& request,
UpdateSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::commit_snapshot(std::string_view instance_id,
const CommitSnapshotRequest& request,
CommitSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::abort_snapshot(std::string_view instance_id,
const AbortSnapshotRequest& request,
AbortSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::drop_snapshot(std::string_view instance_id,
const DropSnapshotRequest& request,
DropSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::list_snapshot(std::string_view instance_id,
const ListSnapshotRequest& request,
ListSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::clone_instance(const CloneInstanceRequest& request,
CloneInstanceResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
std::pair<MetaServiceCode, std::string> SnapshotManager::compact_snapshot(
std::string_view instance_id) {
return {MetaServiceCode::UNDEFINED_ERR, "Not implemented"};
}
std::pair<MetaServiceCode, std::string> SnapshotManager::decouple_instance(std::string_view id) {
std::string instance_id(id);
LOG_INFO("decouple_instance").tag("instance_id", instance_id);
// 1. Create transaction and get current instance info
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
return {MetaServiceCode::KV_TXN_CREATE_ERR, "failed to create txn"};
}
std::string key = instance_key({instance_id});
std::string value;
err = txn->get(key, &value);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
return {MetaServiceCode::CLUSTER_NOT_FOUND,
fmt::format("instance not found, instance_id={}", instance_id)};
} else if (err != TxnErrorCode::TXN_OK) {
return {MetaServiceCode::KV_TXN_GET_ERR,
fmt::format("failed to get instance info, instance_id={}, err={}", instance_id,
err)};
}
InstanceInfoPB instance;
if (!instance.ParseFromString(value)) {
return {MetaServiceCode::PROTOBUF_PARSE_ERR, "failed to parse instance info"};
}
// 2. Check the instance was created via clone_instance
if (!instance.has_source_instance_id() || instance.source_instance_id().empty() ||
!instance.has_source_snapshot_id() || instance.source_snapshot_id().empty()) {
return {MetaServiceCode::INVALID_ARGUMENT,
fmt::format("instance {} was not a cloned instance (created via clone_instance)",
instance_id)};
}
// 3. Check snapshot_compact_status is SNAPSHOT_COMPACT_DONE
if (instance.snapshot_compact_status() != SnapshotCompactStatus::SNAPSHOT_COMPACT_DONE) {
return {MetaServiceCode::INVALID_ARGUMENT,
fmt::format("instance {} snapshot_compact_status is not SNAPSHOT_COMPACT_DONE, "
"current status={}",
instance_id,
SnapshotCompactStatus_Name(instance.snapshot_compact_status()))};
}
// 4. Remove the snapshot reference key in the source instance
const std::string& source_instance_id = instance.source_instance_id();
const std::string& source_snapshot_id = instance.source_snapshot_id();
Versionstamp snapshot_versionstamp;
if (!parse_snapshot_versionstamp(source_snapshot_id, &snapshot_versionstamp)) {
return {MetaServiceCode::UNDEFINED_ERR,
fmt::format("failed to parse source_snapshot_id={} to versionstamp",
source_snapshot_id)};
}
versioned::SnapshotReferenceKeyInfo ref_key_info {source_instance_id, snapshot_versionstamp,
instance_id};
std::string reference_key = versioned::snapshot_reference_key(ref_key_info);
txn->remove(reference_key);
// 5. Clear source_snapshot_id and source_instance_id from the instance PB
instance.clear_source_snapshot_id();
instance.clear_source_instance_id();
// 6. Persist the updated instance
std::string updated_val;
if (!instance.SerializeToString(&updated_val)) {
return {MetaServiceCode::PROTOBUF_SERIALIZE_ERR,
fmt::format("failed to serialize updated instance, instance_id={}", instance_id)};
}
txn->atomic_add(system_meta_service_instance_update_key(), 1);
txn->put(key, updated_val);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
return {MetaServiceCode::KV_TXN_COMMIT_ERR,
fmt::format("failed to commit txn, instance_id={}, err={}", instance_id, err)};
}
LOG_INFO("decouple_instance completed successfully")
.tag("instance_id", instance_id)
.tag("source_instance_id", source_instance_id)
.tag("source_snapshot_id", source_snapshot_id);
return {MetaServiceCode::OK, ""};
}
std::pair<MetaServiceCode, std::string> SnapshotManager::set_multi_version_status(
std::string_view instance_id, MultiVersionStatus multi_version_status) {
return {MetaServiceCode::UNDEFINED_ERR, "Not implemented"};
}
int SnapshotManager::recycle_snapshots(InstanceRecycler* recycler) {
return 0;
}
int SnapshotManager::check_snapshots(InstanceChecker* checker) {
return 0;
}
int SnapshotManager::inverted_check_snapshots(InstanceChecker* checker) {
return 0;
}
int SnapshotManager::check_mvcc_meta_key(InstanceChecker* checker) {
return 0;
}
int SnapshotManager::inverted_check_mvcc_meta_key(InstanceChecker* checker) {
return 0;
}
int SnapshotManager::check_meta(MetaChecker* meta_checker) {
return 0;
}
int SnapshotManager::recycle_snapshot_meta_and_data(std::string_view instance_id,
std::string_view resource_id,
StorageVaultAccessor* accessor,
Versionstamp snapshot_version,
const SnapshotPB& snapshot_pb) {
return 0;
}
int SnapshotManager::migrate_to_versioned_keys(InstanceDataMigrator* migrator) {
LOG(WARNING) << "Migrate to versioned keys is not implemented";
return -1;
}
int SnapshotManager::compact_snapshot_chains(InstanceChainCompactor* compactor) {
LOG(WARNING) << "Compact snapshot chains is not implemented";
return -1;
}
} // namespace doris::cloud