blob: 2ba49d2adde7ab108781bc3d4a45a388e13e1834 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "snapshot/snapshot_manager.h"
#include <fmt/format.h>
#include "common/logging.h"
#include "meta-service/meta_service_helper.h"
#include "meta-store/keys.h"
#include "meta-store/meta_reader.h"
#include "meta-store/versionstamp.h"
#include "recycler/checker.h"
#include "recycler/recycler.h"
namespace doris::cloud {
bool SnapshotManager::parse_snapshot_versionstamp(std::string_view snapshot_id,
Versionstamp* versionstamp) {
if (snapshot_id.size() != 20) {
return false;
}
std::array<uint8_t, 10> versionstamp_data;
for (size_t i = 0; i < 10; ++i) {
const char* hex_chars = snapshot_id.data() + (i * 2);
// Convert two hex digits to one byte more efficiently
uint8_t high_nibble = 0, low_nibble = 0;
// Parse high nibble
if (hex_chars[0] >= '0' && hex_chars[0] <= '9') {
high_nibble = hex_chars[0] - '0';
} else if (hex_chars[0] >= 'a' && hex_chars[0] <= 'f') {
high_nibble = hex_chars[0] - 'a' + 10;
} else if (hex_chars[0] >= 'A' && hex_chars[0] <= 'F') {
high_nibble = hex_chars[0] - 'A' + 10;
} else {
return false;
}
// Parse low nibble
if (hex_chars[1] >= '0' && hex_chars[1] <= '9') {
low_nibble = hex_chars[1] - '0';
} else if (hex_chars[1] >= 'a' && hex_chars[1] <= 'f') {
low_nibble = hex_chars[1] - 'a' + 10;
} else if (hex_chars[1] >= 'A' && hex_chars[1] <= 'F') {
low_nibble = hex_chars[1] - 'A' + 10;
} else {
return false;
}
versionstamp_data[i] = (high_nibble << 4) | low_nibble;
}
*versionstamp = Versionstamp(versionstamp_data);
return true;
}
std::string SnapshotManager::serialize_snapshot_id(Versionstamp snapshot_versionstamp) {
return snapshot_versionstamp.to_string();
}
void SnapshotManager::begin_snapshot(std::string_view instance_id,
const BeginSnapshotRequest& request,
BeginSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::update_snapshot(std::string_view instance_id,
const UpdateSnapshotRequest& request,
UpdateSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::commit_snapshot(std::string_view instance_id,
const CommitSnapshotRequest& request,
CommitSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::abort_snapshot(std::string_view instance_id,
const AbortSnapshotRequest& request,
AbortSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::drop_snapshot(std::string_view instance_id,
const DropSnapshotRequest& request,
DropSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::list_snapshot(std::string_view instance_id,
const ListSnapshotRequest& request,
ListSnapshotResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
void SnapshotManager::clone_instance(const CloneInstanceRequest& request,
CloneInstanceResponse* response) {
response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR);
response->mutable_status()->set_msg("Not implemented");
}
std::pair<MetaServiceCode, std::string> SnapshotManager::compact_snapshot(
std::string_view instance_id) {
return {MetaServiceCode::UNDEFINED_ERR, "Not implemented"};
}
std::pair<MetaServiceCode, std::string> SnapshotManager::decouple_instance(std::string_view id) {
std::string instance_id(id);
LOG_INFO("decouple_instance").tag("instance_id", instance_id);
// 1. Create transaction and get current instance info
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
return {MetaServiceCode::KV_TXN_CREATE_ERR, "failed to create txn"};
}
std::string key = instance_key({instance_id});
std::string value;
err = txn->get(key, &value);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
return {MetaServiceCode::CLUSTER_NOT_FOUND,
fmt::format("instance not found, instance_id={}", instance_id)};
} else if (err != TxnErrorCode::TXN_OK) {
return {MetaServiceCode::KV_TXN_GET_ERR,
fmt::format("failed to get instance info, instance_id={}, err={}", instance_id,
err)};
}
InstanceInfoPB instance;
if (!instance.ParseFromString(value)) {
return {MetaServiceCode::PROTOBUF_PARSE_ERR, "failed to parse instance info"};
}
// 2. Check the instance was created via clone_instance
if (!instance.has_source_instance_id() || instance.source_instance_id().empty() ||
!instance.has_source_snapshot_id() || instance.source_snapshot_id().empty()) {
return {MetaServiceCode::INVALID_ARGUMENT,
fmt::format("instance {} was not a cloned instance (created via clone_instance)",
instance_id)};
}
// 3. Check snapshot_compact_status is SNAPSHOT_COMPACT_DONE
if (instance.snapshot_compact_status() != SnapshotCompactStatus::SNAPSHOT_COMPACT_DONE) {
return {MetaServiceCode::INVALID_ARGUMENT,
fmt::format("instance {} snapshot_compact_status is not SNAPSHOT_COMPACT_DONE, "
"current status={}",
instance_id,
SnapshotCompactStatus_Name(instance.snapshot_compact_status()))};
}
// 4. Remove the snapshot reference key in the source instance
const std::string& source_instance_id = instance.source_instance_id();
const std::string& source_snapshot_id = instance.source_snapshot_id();
Versionstamp snapshot_versionstamp;
if (!parse_snapshot_versionstamp(source_snapshot_id, &snapshot_versionstamp)) {
return {MetaServiceCode::UNDEFINED_ERR,
fmt::format("failed to parse source_snapshot_id={} to versionstamp",
source_snapshot_id)};
}
versioned::SnapshotReferenceKeyInfo ref_key_info {source_instance_id, snapshot_versionstamp,
instance_id};
std::string reference_key = versioned::snapshot_reference_key(ref_key_info);
txn->remove(reference_key);
// 5. Clear source_snapshot_id and source_instance_id from the instance PB
instance.clear_source_snapshot_id();
instance.clear_source_instance_id();
// 6. Persist the updated instance
std::string updated_val;
if (!instance.SerializeToString(&updated_val)) {
return {MetaServiceCode::PROTOBUF_SERIALIZE_ERR,
fmt::format("failed to serialize updated instance, instance_id={}", instance_id)};
}
txn->atomic_add(system_meta_service_instance_update_key(), 1);
txn->put(key, updated_val);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
return {MetaServiceCode::KV_TXN_COMMIT_ERR,
fmt::format("failed to commit txn, instance_id={}, err={}", instance_id, err)};
}
LOG_INFO("decouple_instance completed successfully")
.tag("instance_id", instance_id)
.tag("source_instance_id", source_instance_id)
.tag("source_snapshot_id", source_snapshot_id);
return {MetaServiceCode::OK, ""};
}
std::pair<MetaServiceCode, std::string> SnapshotManager::set_multi_version_status(
std::string_view instance_id, MultiVersionStatus multi_version_status) {
return {MetaServiceCode::UNDEFINED_ERR, "Not implemented"};
}
int SnapshotManager::recycle_snapshots(InstanceRecycler* recycler) {
return 0;
}
int SnapshotManager::check_snapshots(InstanceChecker* checker) {
return 0;
}
int SnapshotManager::inverted_check_snapshots(InstanceChecker* checker) {
return 0;
}
int SnapshotManager::check_mvcc_meta_key(InstanceChecker* checker) {
return 0;
}
int SnapshotManager::inverted_check_mvcc_meta_key(InstanceChecker* checker) {
return 0;
}
int SnapshotManager::check_meta(MetaChecker* meta_checker) {
return 0;
}
int SnapshotManager::recycle_snapshot_meta_and_data(std::string_view instance_id,
std::string_view resource_id,
StorageVaultAccessor* accessor,
Versionstamp snapshot_version,
const SnapshotPB& snapshot_pb) {
return 0;
}
int SnapshotManager::migrate_to_versioned_keys(InstanceDataMigrator* migrator) {
LOG(WARNING) << "Migrate to versioned keys is not implemented";
return -1;
}
int SnapshotManager::compact_snapshot_chains(InstanceChainCompactor* compactor) {
LOG(WARNING) << "Compact snapshot chains is not implemented";
return -1;
}
static std::pair<MetaServiceCode, std::string> get_instance(Transaction* txn,
std::string_view instance_id,
InstanceInfoPB* instance_info) {
InstanceKeyInfo instance_key_info {instance_id};
std::string key = instance_key(instance_key_info);
std::string val;
TxnErrorCode err = txn->get(key, &val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
return {MetaServiceCode::INVALID_ARGUMENT,
fmt::format("instance not found, instance_id={}", instance_id)};
} else if (err != TxnErrorCode::TXN_OK) {
return {cast_as<ErrCategory::READ>(err),
fmt::format("failed to get instance, instance_id={}, err={}", instance_id, err)};
}
if (!instance_info->ParseFromString(val)) {
return {MetaServiceCode::INVALID_ARGUMENT, "failed to parse instance info"};
}
return {MetaServiceCode::OK, ""};
}
std::pair<MetaServiceCode, std::string> SnapshotManager::get_all_snapshots(
Transaction* txn, std::string_view instance_id, std::string_view required_snapshot_id,
std::vector<std::pair<SnapshotPB, Versionstamp>>* snapshots) {
Versionstamp required_snapshot_versionstamp;
if (!required_snapshot_id.empty()) {
if (!parse_snapshot_versionstamp(required_snapshot_id, &required_snapshot_versionstamp)) {
return {MetaServiceCode::INVALID_ARGUMENT, "invalid snapshot_id format"};
}
}
InstanceInfoPB instance_info;
auto [code, error_msg] = get_instance(txn, instance_id, &instance_info);
if (code != MetaServiceCode::OK) {
return {code, error_msg};
}
std::string current_instance_id(instance_id);
if (instance_info.has_original_instance_id() && !instance_info.original_instance_id().empty()) {
// the earliest instance_id for rollback
current_instance_id = instance_info.original_instance_id();
}
std::unordered_set<std::string> visited;
do {
MetaReader meta_reader(current_instance_id);
if (required_snapshot_id.empty()) {
TxnErrorCode err = meta_reader.get_snapshots(txn, snapshots);
if (err != TxnErrorCode::TXN_OK) {
return {cast_as<ErrCategory::READ>(err), "failed to get snapshots"};
}
} else {
SnapshotPB snapshot_pb;
TxnErrorCode err =
meta_reader.get_snapshot(txn, required_snapshot_versionstamp, &snapshot_pb);
if (err == TxnErrorCode::TXN_OK) {
snapshots->emplace_back(snapshot_pb, required_snapshot_versionstamp);
return {MetaServiceCode::OK, ""};
} else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
return {cast_as<ErrCategory::READ>(err), "failed to get snapshot"};
}
}
if (current_instance_id == instance_id) {
break;
}
auto [code, error_msg] = get_instance(txn, current_instance_id, &instance_info);
if (code != MetaServiceCode::OK) {
std::string msg = fmt::format("failed to get ancestor instance {}: {}",
current_instance_id, error_msg);
LOG_WARNING(msg);
return {code, msg};
}
if (!instance_info.has_successor_instance_id() ||
instance_info.successor_instance_id().empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
error_msg = fmt::format(
"successor_instance_id is empty for current instance_id={}, instance_id={}",
current_instance_id, instance_id);
LOG_WARNING(error_msg);
return {code, error_msg};
}
if (visited.count(current_instance_id) > 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
error_msg = fmt::format(
"cycle detected in instance chain, current instance_id={}, instance_id={}",
current_instance_id, instance_id);
LOG_WARNING(error_msg);
return {code, error_msg};
}
visited.insert(current_instance_id);
current_instance_id = instance_info.successor_instance_id();
} while (true);
return {MetaServiceCode::OK, ""};
}
} // namespace doris::cloud