| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <brpc/channel.h> |
| #include <butil/guid.h> |
| #include <fmt/core.h> |
| #include <gen_cpp/cloud.pb.h> |
| |
| #include <algorithm> |
| #include <cctype> |
| #include <charconv> |
| #include <chrono> |
| #include <numeric> |
| #include <queue> |
| #include <regex> |
| #include <string> |
| #include <tuple> |
| #include <unordered_set> |
| |
| #include "common/bvars.h" |
| #include "common/config.h" |
| #include "common/encryption_util.h" |
| #include "common/logging.h" |
| #include "common/network_util.h" |
| #include "common/stats.h" |
| #include "common/string_util.h" |
| #include "cpp/sync_point.h" |
| #include "meta-service/meta_service.h" |
| #include "meta-service/meta_service_helper.h" |
| #include "meta-store/keys.h" |
| #include "meta-store/meta_reader.h" |
| #include "meta-store/txn_kv.h" |
| #include "meta-store/txn_kv_error.h" |
| |
| using namespace std::chrono; |
| |
| namespace { |
| constexpr char pattern_str[] = "^[a-zA-Z][0-9a-zA-Z_]*$"; |
| |
| bool is_valid_storage_vault_name(const std::string& str) { |
| const std::regex pattern(pattern_str); |
| return std::regex_match(str, pattern); |
| } |
| } // namespace |
| |
| namespace doris::cloud { |
| |
| static std::string_view print_cluster_status(const ClusterStatus& status) { |
| switch (status) { |
| case ClusterStatus::UNKNOWN: |
| return "UNKNOWN"; |
| case ClusterStatus::NORMAL: |
| return "NORMAL"; |
| case ClusterStatus::SUSPENDED: |
| return "SUSPENDED"; |
| case ClusterStatus::TO_RESUME: |
| return "TO_RESUME"; |
| case ClusterStatus::MANUAL_SHUTDOWN: |
| return "MANUAL_SHUTDOWN"; |
| default: |
| return "UNKNOWN"; |
| } |
| } |
| |
| static int encrypt_ak_sk_helper(const std::string plain_ak, const std::string plain_sk, |
| EncryptionInfoPB* encryption_info, AkSkPair* cipher_ak_sk_pair, |
| MetaServiceCode& code, std::string& msg) { |
| std::string key; |
| int64_t key_id; |
| LOG_INFO("enter encrypt_ak_sk_helper, plain_ak {}", hide_access_key(plain_ak)); |
| int ret = get_newest_encryption_key_for_ak_sk(&key_id, &key); |
| TEST_SYNC_POINT_CALLBACK("encrypt_ak_sk:get_encryption_key", &ret, &key, &key_id); |
| if (ret != 0) { |
| msg = "failed to get encryption key"; |
| code = MetaServiceCode::ERR_ENCRYPT; |
| LOG(WARNING) << msg; |
| return -1; |
| } |
| auto& encryption_method = get_encryption_method_for_ak_sk(); |
| AkSkPair plain_ak_sk_pair {plain_ak, plain_sk}; |
| ret = encrypt_ak_sk(plain_ak_sk_pair, encryption_method, key, cipher_ak_sk_pair); |
| if (ret != 0) { |
| msg = "failed to encrypt"; |
| code = MetaServiceCode::ERR_ENCRYPT; |
| LOG(WARNING) << msg; |
| return -1; |
| } |
| encryption_info->set_key_id(key_id); |
| encryption_info->set_encryption_method(encryption_method); |
| return 0; |
| } |
| |
| static int decrypt_ak_sk_helper(std::string_view cipher_ak, std::string_view cipher_sk, |
| const EncryptionInfoPB& encryption_info, AkSkPair* plain_ak_sk_pair, |
| MetaServiceCode& code, std::string& msg) { |
| int ret = decrypt_ak_sk_helper(cipher_ak, cipher_sk, encryption_info, plain_ak_sk_pair); |
| if (ret != 0) { |
| msg = "failed to decrypt"; |
| code = MetaServiceCode::ERR_DECPYPT; |
| } |
| return ret; |
| } |
| |
| int decrypt_instance_info(InstanceInfoPB& instance, const std::string& instance_id, |
| MetaServiceCode& code, std::string& msg, |
| std::shared_ptr<Transaction>& txn) { |
| for (auto& obj_info : *instance.mutable_obj_info()) { |
| if (obj_info.has_encryption_info()) { |
| AkSkPair plain_ak_sk_pair; |
| int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(), |
| &plain_ak_sk_pair, code, msg); |
| if (ret != 0) return -1; |
| obj_info.set_ak(std::move(plain_ak_sk_pair.first)); |
| obj_info.set_sk(std::move(plain_ak_sk_pair.second)); |
| } |
| } |
| if (instance.has_ram_user() && instance.ram_user().has_encryption_info()) { |
| auto& ram_user = *instance.mutable_ram_user(); |
| AkSkPair plain_ak_sk_pair; |
| int ret = decrypt_ak_sk_helper(ram_user.ak(), ram_user.sk(), ram_user.encryption_info(), |
| &plain_ak_sk_pair, code, msg); |
| if (ret != 0) return -1; |
| ram_user.set_ak(std::move(plain_ak_sk_pair.first)); |
| ram_user.set_sk(std::move(plain_ak_sk_pair.second)); |
| } |
| |
| std::string val; |
| TxnErrorCode err = txn->get(system_meta_service_arn_info_key(), &val); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| // For compatibility, use arn_info of config |
| RamUserPB iam_user; |
| iam_user.set_user_id(config::arn_id); |
| iam_user.set_external_id(instance_id); |
| iam_user.set_ak(config::arn_ak); |
| iam_user.set_sk(config::arn_sk); |
| instance.mutable_iam_user()->CopyFrom(iam_user); |
| } else if (err == TxnErrorCode::TXN_OK) { |
| RamUserPB iam_user; |
| if (!iam_user.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse RamUserPB"; |
| LOG(WARNING) << msg; |
| return -1; |
| } |
| AkSkPair plain_ak_sk_pair; |
| int ret = decrypt_ak_sk_helper(iam_user.ak(), iam_user.sk(), iam_user.encryption_info(), |
| &plain_ak_sk_pair, code, msg); |
| if (ret != 0) return -1; |
| iam_user.set_ak(std::move(plain_ak_sk_pair.first)); |
| iam_user.set_sk(std::move(plain_ak_sk_pair.second)); |
| instance.mutable_iam_user()->CopyFrom(iam_user); |
| } else { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get arn_info_key, err={}", err); |
| LOG(WARNING) << msg; |
| return -1; |
| } |
| |
| for (auto& stage : *instance.mutable_stages()) { |
| if (stage.has_obj_info() && stage.obj_info().has_encryption_info()) { |
| auto& obj_info = *stage.mutable_obj_info(); |
| AkSkPair plain_ak_sk_pair; |
| int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(), |
| &plain_ak_sk_pair, code, msg); |
| if (ret != 0) return -1; |
| obj_info.set_ak(std::move(plain_ak_sk_pair.first)); |
| obj_info.set_sk(std::move(plain_ak_sk_pair.second)); |
| } |
| } |
| return 0; |
| } |
| |
| static int decrypt_and_update_ak_sk(ObjectStoreInfoPB& obj_info, MetaServiceCode& code, |
| std::string& msg) { |
| if (obj_info.has_encryption_info()) { |
| AkSkPair plain_ak_sk_pair; |
| if (int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(), |
| &plain_ak_sk_pair, code, msg); |
| ret != 0) { |
| return ret; |
| } |
| obj_info.set_ak(std::move(plain_ak_sk_pair.first)); |
| obj_info.set_sk(std::move(plain_ak_sk_pair.second)); |
| } |
| return 0; |
| }; |
| |
| // Asynchronously notify refresh instance in background thread |
| static void async_notify_refresh_instance( |
| std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, bool include_self, |
| std::function<void(const KVStats&)> stats_handler = nullptr) { |
| auto f = new std::function<void()>([instance_id, include_self, txn_kv = std::move(txn_kv), |
| stats_handler = std::move(stats_handler)] { |
| KVStats stats; |
| notify_refresh_instance(txn_kv, instance_id, &stats, include_self); |
| if (stats_handler) { |
| stats_handler(stats); |
| } |
| }); |
| bthread_t bid; |
| if (bthread_start_background(&bid, nullptr, run_bthread_work, f) != 0) { |
| LOG(WARNING) << "notify refresh instance inplace, instance_id=" << instance_id; |
| run_bthread_work(f); |
| } |
| } |
| |
| // Find all instances that need cascade update using BFS traversal |
| // Returns 0 on success, -1 on error |
| // Uses separate read-only transactions to avoid read conflicts with the main write transaction |
| static int collect_direct_derived_instances(TxnKv* txn_kv, const std::string& source_instance_id, |
| std::vector<std::string>* derived_ids) { |
| if (!derived_ids) { |
| return -1; |
| } |
| derived_ids->clear(); |
| |
| if (source_instance_id.empty()) { |
| return 0; |
| } |
| |
| constexpr Versionstamp kMinVersionstamp = Versionstamp::min(); |
| |
| std::string key_start = |
| versioned::snapshot_reference_key_prefix(source_instance_id, kMinVersionstamp); |
| std::string key_end = |
| versioned::snapshot_reference_key_prefix(source_instance_id + '\x00', kMinVersionstamp); |
| std::string current_start = key_start; |
| |
| std::unique_ptr<RangeGetIterator> it; |
| while (true) { |
| std::unique_ptr<Transaction> read_txn; |
| TxnErrorCode err = txn_kv->create_txn(&read_txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to create read transaction for snapshot reference scan, err=" |
| << err << " source_instance_id=" << source_instance_id; |
| return -1; |
| } |
| |
| err = read_txn->get(current_start, key_end, &it); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to scan snapshot reference keys, err=" << err |
| << " source_instance_id=" << source_instance_id; |
| return -1; |
| } |
| |
| while (it->has_next()) { |
| auto [key, value] = it->next(); |
| std::string derived_id; |
| std::string_view key_view = key; |
| if (versioned::decode_snapshot_ref_key(&key_view, nullptr, nullptr, &derived_id) && |
| !derived_id.empty()) { |
| derived_ids->push_back(std::move(derived_id)); |
| } else { |
| LOG(WARNING) << "failed to decode snapshot reference key for source_instance_id=" |
| << source_instance_id << " key=" << hex(key); |
| } |
| } |
| |
| if (!it->more()) { |
| break; |
| } |
| |
| current_start = it->next_begin_key(); |
| if (current_start.empty() || current_start >= key_end) { |
| break; |
| } |
| } |
| |
| return 0; |
| } |
| |
| static int find_cascade_instances(TxnKv* txn_kv, const std::string& root_instance_id, |
| std::vector<std::string>* out) { |
| std::queue<std::string> to_visit; |
| std::unordered_set<std::string> visited; |
| std::vector<std::string> direct_children; |
| |
| to_visit.push(root_instance_id); |
| visited.insert(root_instance_id); |
| |
| while (!to_visit.empty()) { |
| std::string current_instance_id = to_visit.front(); |
| to_visit.pop(); |
| |
| if (collect_direct_derived_instances(txn_kv, current_instance_id, &direct_children) != 0) { |
| LOG(WARNING) << "failed to collect derived instances for source_instance_id=" |
| << current_instance_id; |
| return -1; |
| } |
| |
| for (auto& derived_id : direct_children) { |
| if (derived_id.empty() || visited.contains(derived_id)) { |
| continue; |
| } |
| |
| out->push_back(derived_id); |
| to_visit.push(derived_id); |
| visited.insert(derived_id); |
| LOG(INFO) << "found derived instance: " << derived_id |
| << " from source: " << current_instance_id; |
| } |
| } |
| |
| LOG(INFO) << "find_cascade_instances completed, found " << out->size() |
| << " instances to cascade from root: " << root_instance_id; |
| return 0; |
| } |
| |
| // Helper function to update AK/SK for a single instance |
| // Returns 0 on success, -1 on error |
| static int update_instance_ak_sk(InstanceInfoPB& instance, const UpdateAkSkRequest* request, |
| uint64_t time, MetaServiceCode& code, std::string& msg, |
| std::stringstream& update_record) { |
| // Update ram_user if has ram_user |
| if (request->has_ram_user()) { |
| if (request->ram_user().user_id().empty() || request->ram_user().ak().empty() || |
| request->ram_user().sk().empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "ram user info err " + proto_to_json(*request); |
| return -1; |
| } |
| if (!instance.has_ram_user()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "instance doesn't have ram user info"; |
| return -1; |
| } |
| auto& ram_user = request->ram_user(); |
| EncryptionInfoPB encryption_info; |
| AkSkPair cipher_ak_sk_pair; |
| if (encrypt_ak_sk_helper(ram_user.ak(), ram_user.sk(), &encryption_info, &cipher_ak_sk_pair, |
| code, msg) != 0) { |
| return -1; |
| } |
| const auto& [ak, sk] = cipher_ak_sk_pair; |
| auto& instance_ram_user = *instance.mutable_ram_user(); |
| if (ram_user.user_id() != instance_ram_user.user_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "ram user_id err"; |
| return -1; |
| } |
| std::string old_ak = instance_ram_user.ak(); |
| std::string old_sk = instance_ram_user.sk(); |
| if (old_ak == ak && old_sk == sk) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "ak sk eq original, please check it"; |
| return -1; |
| } |
| instance_ram_user.set_ak(std::move(cipher_ak_sk_pair.first)); |
| instance_ram_user.set_sk(std::move(cipher_ak_sk_pair.second)); |
| instance_ram_user.mutable_encryption_info()->CopyFrom(encryption_info); |
| update_record << "update ram_user's ak sk, instance_id: " << instance.instance_id() |
| << " user_id: " << ram_user.user_id() << " old: cipher ak: " << old_ak |
| << " cipher sk: " << old_sk << " new: cipher ak: " << ak |
| << " cipher sk: " << sk << "; "; |
| } |
| |
| // Update internal_bucket_user |
| bool has_found_alter_obj_info = false; |
| for (auto& alter_bucket_user : request->internal_bucket_user()) { |
| if (!alter_bucket_user.has_ak() || !alter_bucket_user.has_sk() || |
| !alter_bucket_user.has_user_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "s3 bucket info err " + proto_to_json(*request); |
| return -1; |
| } |
| std::string user_id = alter_bucket_user.user_id(); |
| EncryptionInfoPB encryption_info; |
| AkSkPair cipher_ak_sk_pair; |
| if (encrypt_ak_sk_helper(alter_bucket_user.ak(), alter_bucket_user.sk(), &encryption_info, |
| &cipher_ak_sk_pair, code, msg) != 0) { |
| return -1; |
| } |
| const auto& [ak, sk] = cipher_ak_sk_pair; |
| auto& obj_info = |
| const_cast<std::decay_t<decltype(instance.obj_info())>&>(instance.obj_info()); |
| for (auto& it : obj_info) { |
| std::string old_ak = it.ak(); |
| std::string old_sk = it.sk(); |
| if (!it.has_user_id()) { |
| has_found_alter_obj_info = true; |
| // For compatibility, obj_info without a user_id only allow |
| // single internal_bucket_user to modify it. |
| if (request->internal_bucket_user_size() != 1) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "fail to update old instance's obj_info, s3 obj info err " + |
| proto_to_json(*request); |
| return -1; |
| } |
| if (it.ak() == ak && it.sk() == sk) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "ak sk eq original, please check it"; |
| return -1; |
| } |
| it.set_mtime(time); |
| it.set_user_id(user_id); |
| it.set_ak(ak); |
| it.set_sk(sk); |
| it.mutable_encryption_info()->CopyFrom(encryption_info); |
| update_record << "update obj_info's ak sk without user_id, instance_id: " |
| << instance.instance_id() << " obj_info_id: " << it.id() |
| << " new user_id: " << user_id << " old: cipher ak: " << old_ak |
| << " cipher sk: " << old_sk << " new: cipher ak: " << ak |
| << " cipher sk: " << sk << "; "; |
| continue; |
| } |
| if (it.user_id() == user_id) { |
| has_found_alter_obj_info = true; |
| if (it.ak() == ak && it.sk() == sk) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "ak sk eq original, please check it"; |
| return -1; |
| } |
| it.set_mtime(time); |
| it.set_ak(ak); |
| it.set_sk(sk); |
| it.mutable_encryption_info()->CopyFrom(encryption_info); |
| update_record << "update obj_info's ak sk, instance_id: " << instance.instance_id() |
| << " obj_info_id: " << it.id() << " user_id: " << user_id |
| << " old: cipher ak: " << old_ak << " cipher sk: " << old_sk |
| << " new: cipher ak: " << ak << " cipher sk: " << sk << "; "; |
| } |
| } |
| } |
| |
| if (!request->internal_bucket_user().empty() && !has_found_alter_obj_info) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "fail to find the alter obj info"; |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| void MetaServiceImpl::get_obj_store_info(google::protobuf::RpcController* controller, |
| const GetObjStoreInfoRequest* request, |
| GetObjStoreInfoResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_obj_store_info, get); |
| TEST_SYNC_POINT_CALLBACK("obj-store-info_sk_response", &response); |
| TEST_SYNC_POINT_RETURN_WITH_VOID("obj-store-info_sk_response_return"); |
| // Prepare data |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(get_obj_store_info) |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| for (auto& obj_info : *instance.mutable_obj_info()) { |
| if (auto ret = decrypt_and_update_ak_sk(obj_info, code, msg); ret != 0) { |
| return; |
| } |
| } |
| |
| response->set_enable_storage_vault(instance.enable_storage_vault()); |
| |
| // Iterate all the resources to return to the rpc caller |
| if (!instance.resource_ids().empty()) { |
| std::string storage_vault_start = storage_vault_key({instance.instance_id(), ""}); |
| std::string storage_vault_end = storage_vault_key({instance.instance_id(), "\xff"}); |
| std::unique_ptr<RangeGetIterator> it; |
| do { |
| TxnErrorCode err = txn->get(storage_vault_start, storage_vault_end, &it); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("internal error, failed to get storage vault, err={}", err); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| auto* vault = response->add_storage_vault(); |
| if (!vault->ParseFromArray(v.data(), v.size())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = fmt::format("malformed storage vault, unable to deserialize key={}", |
| hex(k)); |
| LOG(WARNING) << msg << " key=" << hex(k); |
| return; |
| } |
| if (!it->has_next()) { |
| storage_vault_start = k; |
| } |
| } |
| storage_vault_start.push_back('\x00'); // Update to next smallest key for iteration |
| } while (it->more()); |
| } |
| for (auto& vault : *response->mutable_storage_vault()) { |
| if (vault.has_obj_info()) { |
| if (auto ret = decrypt_and_update_ak_sk(*vault.mutable_obj_info(), code, msg); |
| ret != 0) { |
| return; |
| } |
| } |
| } |
| |
| response->mutable_obj_info()->CopyFrom(instance.obj_info()); |
| if (instance.has_default_storage_vault_id()) { |
| response->set_default_storage_vault_id(instance.default_storage_vault_id()); |
| response->set_default_storage_vault_name(instance.default_storage_vault_name()); |
| } |
| } |
| |
| // The next available vault id would be max(max(obj info id), max(vault id)) + 1. |
| static std::string next_available_vault_id(const InstanceInfoPB& instance) { |
| int vault_id = 0; |
| auto max = [](int prev, const auto& last) { |
| int last_id = 0; |
| std::string_view value = "0"; |
| if constexpr (std::is_same_v<std::decay_t<decltype(last)>, ObjectStoreInfoPB>) { |
| value = last.id(); |
| } else if constexpr (std::is_same_v<std::decay_t<decltype(last)>, std::string>) { |
| value = last; |
| } |
| if (auto [_, ec] = std::from_chars(value.data(), value.data() + value.size(), last_id); |
| ec != std::errc {}) [[unlikely]] { |
| LOG_WARNING("Invalid resource id format: {}", value); |
| last_id = 0; |
| DCHECK(false); |
| } |
| return std::max(prev, last_id); |
| }; |
| auto prev = std::accumulate( |
| instance.resource_ids().begin(), instance.resource_ids().end(), |
| std::accumulate(instance.obj_info().begin(), instance.obj_info().end(), vault_id, max), |
| max); |
| return std::to_string(prev + 1); |
| } |
| |
| namespace detail { |
| |
| // Validate and normalize hdfs prefix. Return true if prefix is valid. |
| bool normalize_hdfs_prefix(std::string& prefix) { |
| if (prefix.empty()) { |
| return true; |
| } |
| |
| if (prefix.find("://") != std::string::npos) { |
| // Should not contain scheme |
| return false; |
| } |
| |
| trim(prefix); |
| return true; |
| } |
| |
| // Validate and normalize hdfs fs_name. Return true if fs_name is valid. |
| bool normalize_hdfs_fs_name(std::string& fs_name) { |
| if (fs_name.empty()) { |
| return false; |
| } |
| |
| // Should check scheme existence? |
| trim(fs_name); |
| return !fs_name.empty(); |
| } |
| |
| } // namespace detail |
| |
| static int add_hdfs_storage_vault(InstanceInfoPB& instance, Transaction* txn, |
| StorageVaultPB& hdfs_param, MetaServiceCode& code, |
| std::string& msg) { |
| #ifndef ENABLE_HDFS_STORAGE_VAULT |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format( |
| "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT build option), " |
| "but HDFS storage vaults were detected: {}", |
| hdfs_param.name()); |
| LOG(ERROR) << "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT build option), " |
| << "but HDFS storage vaults were detected: " << hdfs_param.name(); |
| return -1; |
| #endif |
| |
| if (!hdfs_param.has_hdfs_info()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("vault_name={} passed invalid argument", hdfs_param.name()); |
| return -1; |
| } |
| |
| using namespace detail; |
| // Check and normalize hdfs conf |
| auto* prefix = hdfs_param.mutable_hdfs_info()->mutable_prefix(); |
| if (!normalize_hdfs_prefix(*prefix)) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("invalid prefix: {}", *prefix); |
| return -1; |
| } |
| if (config::enable_distinguish_hdfs_path) { |
| auto uuid_suffix = butil::GenerateGUID(); |
| if (uuid_suffix.empty()) [[unlikely]] { |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = fmt::format("failed to generate one suffix for hdfs prefix: {}", *prefix); |
| return -1; |
| } |
| *prefix = fmt::format("{}_{}", *prefix, uuid_suffix); |
| } |
| |
| auto* fs_name = hdfs_param.mutable_hdfs_info()->mutable_build_conf()->mutable_fs_name(); |
| if (!normalize_hdfs_fs_name(*fs_name)) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = fmt::format("invalid fs_name: {}", *fs_name); |
| return -1; |
| } |
| |
| std::string key; |
| std::string vault_id = next_available_vault_id(instance); |
| storage_vault_key({instance.instance_id(), vault_id}, &key); |
| hdfs_param.set_id(vault_id); |
| std::string val = hdfs_param.SerializeAsString(); |
| txn->put(key, val); |
| LOG_INFO("try to put storage vault_id={}, vault_name={}, vault_key={}", vault_id, |
| hdfs_param.name(), hex(key)); |
| instance.mutable_resource_ids()->Add(std::move(vault_id)); |
| *instance.mutable_storage_vault_names()->Add() = hdfs_param.name(); |
| return 0; |
| } |
| |
| static void create_object_info_with_encrypt(const InstanceInfoPB& instance, ObjectStoreInfoPB* obj, |
| bool sse_enabled, MetaServiceCode& code, |
| std::string& msg) { |
| std::string plain_ak = obj->has_ak() ? obj->ak() : ""; |
| std::string plain_sk = obj->has_sk() ? obj->sk() : ""; |
| std::string bucket = obj->has_bucket() ? obj->bucket() : ""; |
| std::string prefix = obj->has_prefix() ? obj->prefix() : ""; |
| // format prefix, such as `/aa/bb/`, `aa/bb//`, `//aa/bb`, ` /aa/bb` -> `aa/bb` |
| trim(prefix); |
| std::string endpoint = obj->has_endpoint() ? obj->endpoint() : ""; |
| std::string external_endpoint = obj->has_external_endpoint() ? obj->external_endpoint() : ""; |
| std::string region = obj->has_region() ? obj->region() : ""; |
| |
| if (obj->has_role_arn()) { |
| if (obj->role_arn().empty() || !obj->has_cred_provider_type() || |
| obj->cred_provider_type() != CredProviderTypePB::INSTANCE_PROFILE || |
| !obj->has_provider() || obj->provider() != ObjectStoreInfoPB::S3 || bucket.empty() || |
| endpoint.empty() || region.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "s3 conf info err with role_arn, please check it"; |
| return; |
| } |
| } else { |
| // ATTN: prefix may be empty |
| if (plain_ak.empty() || plain_sk.empty() || bucket.empty() || endpoint.empty() || |
| region.empty() || !obj->has_provider() || external_endpoint.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "s3 conf info err, please check it"; |
| return; |
| } |
| |
| EncryptionInfoPB encryption_info; |
| AkSkPair cipher_ak_sk_pair; |
| auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, |
| code, msg); |
| TEST_SYNC_POINT_CALLBACK("create_object_info_with_encrypt", &ret, &code, &msg); |
| if (ret != 0) { |
| return; |
| } |
| obj->set_ak(std::move(cipher_ak_sk_pair.first)); |
| obj->set_sk(std::move(cipher_ak_sk_pair.second)); |
| obj->mutable_encryption_info()->CopyFrom(encryption_info); |
| } |
| |
| obj->set_bucket(bucket); |
| obj->set_prefix(prefix); |
| obj->set_endpoint(endpoint); |
| obj->set_external_endpoint(external_endpoint); |
| obj->set_region(region); |
| obj->set_id(next_available_vault_id(instance)); |
| auto now_time = std::chrono::system_clock::now(); |
| uint64_t time = |
| std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count(); |
| obj->set_ctime(time); |
| obj->set_mtime(time); |
| obj->set_sse_enabled(sse_enabled); |
| } |
| |
| static int add_vault_into_instance(InstanceInfoPB& instance, Transaction* txn, |
| StorageVaultPB& vault_param, MetaServiceCode& code, |
| std::string& msg) { |
| if (std::find_if(instance.storage_vault_names().begin(), instance.storage_vault_names().end(), |
| [&vault_param](const auto& name) { return name == vault_param.name(); }) != |
| instance.storage_vault_names().end()) { |
| code = MetaServiceCode::ALREADY_EXISTED; |
| msg = fmt::format("vault_name={} already created", vault_param.name()); |
| return -1; |
| } |
| |
| if (vault_param.has_hdfs_info()) { |
| return add_hdfs_storage_vault(instance, txn, vault_param, code, msg); |
| } |
| |
| create_object_info_with_encrypt(instance, vault_param.mutable_obj_info(), true, code, msg); |
| if (code != MetaServiceCode::OK) { |
| return -1; |
| } |
| |
| vault_param.mutable_obj_info()->CopyFrom(vault_param.obj_info()); |
| vault_param.set_id(vault_param.obj_info().id()); |
| auto vault_key = storage_vault_key({instance.instance_id(), vault_param.obj_info().id()}); |
| *instance.mutable_resource_ids()->Add() = vault_param.id(); |
| *instance.mutable_storage_vault_names()->Add() = vault_param.name(); |
| LOG_INFO("try to put storage vault_id={}, vault_name={}, vault_key={}", vault_param.id(), |
| vault_param.name(), hex(vault_key)); |
| txn->put(vault_key, vault_param.SerializeAsString()); |
| return 0; |
| } |
| |
| static int remove_hdfs_storage_vault(InstanceInfoPB& instance, Transaction* txn, |
| const StorageVaultPB& hdfs_info, MetaServiceCode& code, |
| std::string& msg) { |
| std::string_view vault_name = hdfs_info.name(); |
| auto name_iter = std::find_if(instance.storage_vault_names().begin(), |
| instance.storage_vault_names().end(), |
| [&](const auto& name) { return vault_name == name; }); |
| if (name_iter == instance.storage_vault_names().end()) { |
| code = MetaServiceCode::STORAGE_VAULT_NOT_FOUND; |
| msg = fmt::format("vault_name={} not found", vault_name); |
| return -1; |
| } |
| auto vault_idx = name_iter - instance.storage_vault_names().begin(); |
| auto vault_id_iter = instance.resource_ids().begin() + vault_idx; |
| std::string_view vault_id = *vault_id_iter; |
| std::string vault_key = storage_vault_key({instance.instance_id(), vault_id}); |
| |
| txn->remove(vault_key); |
| instance.mutable_storage_vault_names()->DeleteSubrange(vault_idx, 1); |
| instance.mutable_resource_ids()->DeleteSubrange(vault_idx, 1); |
| LOG(INFO) << "remove storage_vault_key=" << hex(vault_key); |
| |
| return 0; |
| } |
| |
| // Log vault message and origin default storage vault message for potential tracing |
| static void set_default_vault_log_helper(const InstanceInfoPB& instance, |
| std::string_view vault_name, std::string_view vault_id) { |
| auto vault_msg = fmt::format("instance {} tries to set default vault as {}, id {}", |
| instance.instance_id(), vault_id, vault_name); |
| if (instance.has_default_storage_vault_id()) { |
| vault_msg = fmt::format("{}, origin default vault name {}, vault id {}", vault_msg, |
| instance.default_storage_vault_name(), |
| instance.default_storage_vault_id()); |
| } |
| LOG(INFO) << vault_msg; |
| } |
| |
| static bool vault_exist(const InstanceInfoPB& instance, const std::string& new_vault_name) { |
| for (auto& name : instance.storage_vault_names()) { |
| if (new_vault_name == name) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn, |
| const StorageVaultPB& vault, MetaServiceCode& code, |
| std::string& msg, AlterObjStoreInfoResponse* response) { |
| if (!vault.has_hdfs_info()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| std::stringstream ss; |
| ss << "There is no hdfs vault provided"; |
| msg = ss.str(); |
| return -1; |
| } |
| const auto& hdfs_info = vault.hdfs_info(); |
| if (hdfs_info.has_prefix() || !hdfs_info.has_build_conf() || |
| hdfs_info.build_conf().has_fs_name()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| std::stringstream ss; |
| ss << "You can not alter prefix or fs name because it might lose previoud written data"; |
| msg = ss.str(); |
| return -1; |
| } |
| const auto& name = vault.name(); |
| // Here we try to get mutable iter since we might need to alter the vault name |
| auto name_itr = std::find_if(instance.mutable_storage_vault_names()->begin(), |
| instance.mutable_storage_vault_names()->end(), |
| [&](const auto& vault_name) { return name == vault_name; }); |
| if (name_itr == instance.storage_vault_names().end()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| std::stringstream ss; |
| ss << "invalid storage vault name, not found, name =" << name; |
| msg = ss.str(); |
| return -1; |
| } |
| auto pos = name_itr - instance.storage_vault_names().begin(); |
| std::string vault_id = instance.resource_ids().begin()[pos]; |
| auto vault_key = storage_vault_key({instance.instance_id(), vault_id}); |
| std::string val; |
| |
| auto err = txn->get(vault_key, &val); |
| LOG(INFO) << "get vault_key=" << hex(vault_key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| std::stringstream ss; |
| ss << "failed to get storage vault, vault_id=" << vault_id << ", vault_name=" |
| << "" << name << " err=" << err; |
| msg = ss.str(); |
| return -1; |
| } |
| StorageVaultPB new_vault; |
| new_vault.ParseFromString(val); |
| |
| if (!new_vault.has_hdfs_info()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| std::stringstream ss; |
| ss << name << " is not hdfs storage vault"; |
| msg = ss.str(); |
| return -1; |
| } |
| |
| auto origin_vault_info = new_vault.DebugString(); |
| if (vault.has_alter_name()) { |
| if (!is_valid_storage_vault_name(vault.alter_name())) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| std::stringstream ss; |
| ss << "invalid storage vault name =" << vault.alter_name() << " the name must satisfy " |
| << pattern_str; |
| msg = ss.str(); |
| return -1; |
| } |
| |
| if (vault_exist(instance, vault.alter_name())) { |
| code = MetaServiceCode::ALREADY_EXISTED; |
| msg = fmt::format("vault_name={} already existed", vault.alter_name()); |
| return -1; |
| } |
| |
| new_vault.set_name(vault.alter_name()); |
| *name_itr = vault.alter_name(); |
| } |
| auto* alter_hdfs_info = new_vault.mutable_hdfs_info(); |
| if (hdfs_info.build_conf().has_hdfs_kerberos_keytab()) { |
| alter_hdfs_info->mutable_build_conf()->set_hdfs_kerberos_keytab( |
| hdfs_info.build_conf().hdfs_kerberos_keytab()); |
| } |
| if (hdfs_info.build_conf().has_hdfs_kerberos_principal()) { |
| alter_hdfs_info->mutable_build_conf()->set_hdfs_kerberos_principal( |
| hdfs_info.build_conf().hdfs_kerberos_principal()); |
| } |
| if (hdfs_info.build_conf().has_user()) { |
| alter_hdfs_info->mutable_build_conf()->set_user(hdfs_info.build_conf().user()); |
| } |
| if (0 != hdfs_info.build_conf().hdfs_confs_size()) { |
| alter_hdfs_info->mutable_build_conf()->mutable_hdfs_confs()->Add( |
| hdfs_info.build_conf().hdfs_confs().begin(), |
| hdfs_info.build_conf().hdfs_confs().end()); |
| } |
| auto new_vault_info = new_vault.DebugString(); |
| |
| val = new_vault.SerializeAsString(); |
| if (val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return -1; |
| } |
| |
| txn->put(vault_key, val); |
| LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key) |
| << ", origin vault=" << origin_vault_info << ", new_vault=" << new_vault_info; |
| |
| DCHECK_EQ(new_vault.id(), vault_id); |
| response->set_storage_vault_id(new_vault.id()); |
| return 0; |
| } |
| |
| static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn, |
| const StorageVaultPB& vault, MetaServiceCode& code, |
| std::string& msg, AlterObjStoreInfoResponse* response) { |
| if (!vault.has_obj_info()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| std::stringstream ss; |
| ss << "There is no s3 vault provided"; |
| msg = ss.str(); |
| return -1; |
| } |
| const auto& obj_info = vault.obj_info(); |
| if (obj_info.has_bucket() || obj_info.has_endpoint() || obj_info.has_prefix() || |
| obj_info.has_provider()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| std::stringstream ss; |
| ss << "Bucket, endpoint, prefix and provider can not be altered"; |
| msg = ss.str(); |
| return -1; |
| } |
| |
| const auto& name = vault.name(); |
| // Here we try to get mutable iter since we might need to alter the vault name |
| auto name_itr = std::find_if(instance.mutable_storage_vault_names()->begin(), |
| instance.mutable_storage_vault_names()->end(), |
| [&](const auto& vault_name) { return name == vault_name; }); |
| if (name_itr == instance.storage_vault_names().end()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| std::stringstream ss; |
| ss << "invalid storage vault name, not found, name =" << name; |
| msg = ss.str(); |
| return -1; |
| } |
| auto pos = name_itr - instance.storage_vault_names().begin(); |
| std::string vault_id = instance.resource_ids().begin()[pos]; |
| auto vault_key = storage_vault_key({instance.instance_id(), vault_id}); |
| std::string val; |
| |
| auto err = txn->get(vault_key, &val); |
| LOG(INFO) << "get vault_key=" << hex(vault_key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| std::stringstream ss; |
| ss << "failed to get storage vault, vault_id=" << vault_id << ", vault_name=" |
| << "" << name << " err=" << err; |
| msg = ss.str(); |
| return -1; |
| } |
| StorageVaultPB new_vault; |
| new_vault.ParseFromString(val); |
| if (!new_vault.has_obj_info()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| std::stringstream ss; |
| ss << name << " is not s3 storage vault"; |
| msg = ss.str(); |
| return -1; |
| } |
| |
| auto origin_vault_info = new_vault.DebugString(); |
| |
| if (vault.has_alter_name()) { |
| if (!is_valid_storage_vault_name(vault.alter_name())) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| std::stringstream ss; |
| ss << "invalid storage vault name =" << vault.alter_name() << " the name must satisfy " |
| << pattern_str; |
| msg = ss.str(); |
| return -1; |
| } |
| |
| if (vault_exist(instance, vault.alter_name())) { |
| code = MetaServiceCode::ALREADY_EXISTED; |
| msg = fmt::format("vault_name={} already existed", vault.alter_name()); |
| return -1; |
| } |
| |
| new_vault.set_name(vault.alter_name()); |
| *name_itr = vault.alter_name(); |
| } |
| |
| if (obj_info.has_role_arn() && (obj_info.has_ak() || obj_info.has_sk())) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "invaild argument, both set ak/sk and role_arn is not allowed"; |
| LOG(WARNING) << msg; |
| return -1; |
| } |
| |
| if (obj_info.has_ak() ^ obj_info.has_sk()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| std::stringstream ss; |
| ss << "Accesskey and secretkey must be alter together"; |
| msg = ss.str(); |
| return -1; |
| } |
| |
| if (obj_info.has_ak()) { |
| EncryptionInfoPB encryption_info = new_vault.obj_info().encryption_info(); |
| AkSkPair new_ak_sk_pair {new_vault.obj_info().ak(), new_vault.obj_info().sk()}; |
| |
| // ak and sk must be altered together, there is check before. |
| auto ret = encrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), &encryption_info, |
| &new_ak_sk_pair, code, msg); |
| if (ret != 0) { |
| msg = "failed to encrypt"; |
| code = MetaServiceCode::ERR_ENCRYPT; |
| LOG(WARNING) << msg; |
| return -1; |
| } |
| new_vault.mutable_obj_info()->clear_role_arn(); |
| new_vault.mutable_obj_info()->clear_external_id(); |
| new_vault.mutable_obj_info()->clear_cred_provider_type(); |
| |
| new_vault.mutable_obj_info()->set_ak(new_ak_sk_pair.first); |
| new_vault.mutable_obj_info()->set_sk(new_ak_sk_pair.second); |
| new_vault.mutable_obj_info()->mutable_encryption_info()->CopyFrom(encryption_info); |
| } |
| |
| if (obj_info.has_role_arn()) { |
| new_vault.mutable_obj_info()->clear_ak(); |
| new_vault.mutable_obj_info()->clear_sk(); |
| new_vault.mutable_obj_info()->clear_encryption_info(); |
| |
| new_vault.mutable_obj_info()->set_role_arn(obj_info.role_arn()); |
| new_vault.mutable_obj_info()->set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE); |
| if (obj_info.has_external_id()) { |
| new_vault.mutable_obj_info()->set_external_id(obj_info.external_id()); |
| } |
| } |
| |
| if (obj_info.has_use_path_style()) { |
| new_vault.mutable_obj_info()->set_use_path_style(obj_info.use_path_style()); |
| } |
| |
| auto now_time = std::chrono::system_clock::now(); |
| uint64_t time = |
| std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count(); |
| new_vault.mutable_obj_info()->set_mtime(time); |
| |
| auto new_vault_info = new_vault.DebugString(); |
| val = new_vault.SerializeAsString(); |
| if (val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return -1; |
| } |
| |
| txn->put(vault_key, val); |
| LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key) |
| << ", origin vault=" << encryt_sk(hide_ak(origin_vault_info)) |
| << ", new vault=" << encryt_sk(hide_ak(new_vault_info)); |
| |
| DCHECK_EQ(new_vault.id(), vault_id); |
| response->set_storage_vault_id(new_vault.id()); |
| return 0; |
| } |
| |
| struct ObjectStorageDesc { |
| std::string& ak; |
| std::string& sk; |
| std::string& bucket; |
| std::string& prefix; |
| std::string& endpoint; |
| std::string& external_endpoint; |
| std::string& region; |
| bool& use_path_style; |
| |
| std::string& role_arn; |
| std::string& external_id; |
| }; |
| |
| static int extract_object_storage_info(const AlterObjStoreInfoRequest* request, |
| MetaServiceCode& code, std::string& msg, |
| ObjectStorageDesc& obj_desc, |
| EncryptionInfoPB& encryption_info, |
| AkSkPair& cipher_ak_sk_pair) { |
| if (!request->has_obj() && (!request->has_vault() || !request->vault().has_obj_info())) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "s3 obj info err " + proto_to_json(*request); |
| return -1; |
| } |
| |
| const auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info(); |
| |
| // obj size > 1k, refuse |
| if (obj.ByteSizeLong() > 1024) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "s3 obj info greater than 1k " + proto_to_json(*request); |
| return -1; |
| }; |
| |
| auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style, role_arn, |
| external_id] = obj_desc; |
| |
| if (!obj.has_role_arn()) { |
| if (!obj.has_ak() || !obj.has_sk()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "s3 obj info err " + proto_to_json(*request); |
| LOG(INFO) << msg; |
| return -1; |
| } |
| |
| std::string plain_ak = obj.has_ak() ? obj.ak() : ""; |
| std::string plain_sk = obj.has_sk() ? obj.sk() : ""; |
| auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, |
| code, msg); |
| if (ret != 0) { |
| return -1; |
| } |
| |
| ak = cipher_ak_sk_pair.first; |
| sk = cipher_ak_sk_pair.second; |
| } else { |
| if (obj.has_ak() || obj.has_sk()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "invaild argument, both set ak/sk and role_arn is not allowed"; |
| return -1; |
| } |
| |
| role_arn = obj.has_role_arn() ? obj.role_arn() : ""; |
| external_id = obj.has_external_id() ? obj.external_id() : ""; |
| } |
| TEST_SYNC_POINT_CALLBACK("extract_object_storage_info:get_aksk_pair", &cipher_ak_sk_pair); |
| bucket = obj.has_bucket() ? obj.bucket() : ""; |
| prefix = obj.has_prefix() ? obj.prefix() : ""; |
| endpoint = obj.has_endpoint() ? obj.endpoint() : ""; |
| external_endpoint = obj.has_external_endpoint() ? obj.external_endpoint() : ""; |
| region = obj.has_region() ? obj.region() : ""; |
| use_path_style = obj.use_path_style(); |
| return 0; |
| } |
| |
| static ObjectStoreInfoPB object_info_pb_factory(ObjectStorageDesc& obj_desc, |
| const ObjectStoreInfoPB& obj, |
| InstanceInfoPB& instance, |
| EncryptionInfoPB& encryption_info, |
| AkSkPair& cipher_ak_sk_pair) { |
| ObjectStoreInfoPB last_item; |
| auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style, role_arn, |
| external_id] = obj_desc; |
| auto now_time = std::chrono::system_clock::now(); |
| uint64_t time = |
| std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count(); |
| last_item.set_ctime(time); |
| last_item.set_mtime(time); |
| last_item.set_id(next_available_vault_id(instance)); |
| if (obj.has_user_id()) { |
| last_item.set_user_id(obj.user_id()); |
| } |
| |
| if (!obj.has_role_arn()) { |
| last_item.set_ak(std::move(cipher_ak_sk_pair.first)); |
| last_item.set_sk(std::move(cipher_ak_sk_pair.second)); |
| last_item.mutable_encryption_info()->CopyFrom(encryption_info); |
| } else { |
| last_item.set_role_arn(role_arn); |
| last_item.set_external_id(external_id); |
| last_item.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE); |
| } |
| last_item.set_bucket(bucket); |
| // format prefix, such as `/aa/bb/`, `aa/bb//`, `//aa/bb`, ` /aa/bb` -> `aa/bb` |
| trim(prefix); |
| last_item.set_prefix(prefix); |
| last_item.set_endpoint(endpoint); |
| last_item.set_external_endpoint(external_endpoint); |
| last_item.set_region(region); |
| last_item.set_provider(obj.provider()); |
| last_item.set_sse_enabled(instance.sse_enabled()); |
| last_item.set_use_path_style(use_path_style); |
| |
| return last_item; |
| } |
| |
| void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* controller, |
| const AlterObjStoreInfoRequest* request, |
| AlterObjStoreInfoResponse* response, |
| ::google::protobuf::Closure* done) { |
| std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region, role_arn, external_id; |
| bool use_path_style; |
| EncryptionInfoPB encryption_info; |
| AkSkPair cipher_ak_sk_pair; |
| RPC_PREPROCESS(alter_storage_vault, get, put, del); |
| switch (request->op()) { |
| case AlterObjStoreInfoRequest::ADD_S3_VAULT: |
| case AlterObjStoreInfoRequest::DROP_S3_VAULT: { |
| auto tmp_desc = ObjectStorageDesc {ak, sk, |
| bucket, prefix, |
| endpoint, external_endpoint, |
| region, use_path_style, |
| role_arn, external_id}; |
| if (0 != extract_object_storage_info(request, code, msg, tmp_desc, encryption_info, |
| cipher_ak_sk_pair)) { |
| return; |
| } |
| } break; |
| case AlterObjStoreInfoRequest::ADD_BUILT_IN_VAULT: { |
| // It should at least has one hdfs info or obj info inside storage vault |
| if ((!request->has_vault())) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "storage vault is set " + proto_to_json(*request); |
| return; |
| } |
| break; |
| } |
| case AlterObjStoreInfoRequest::ADD_HDFS_INFO: |
| case AlterObjStoreInfoRequest::DROP_HDFS_INFO: { |
| if (!request->has_vault() || !request->vault().has_name()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "hdfs info is not found " + proto_to_json(*request); |
| return; |
| } |
| } break; |
| case AlterObjStoreInfoRequest::SET_DEFAULT_VAULT: { |
| if (!request->has_vault() || !request->vault().has_name()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "hdfs info is not found " + proto_to_json(*request); |
| return; |
| } |
| break; |
| } |
| case AlterObjStoreInfoRequest::ALTER_S3_VAULT: |
| break; |
| case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT: |
| break; |
| case AlterObjStoreInfoRequest::UNSET_DEFAULT_VAULT: |
| break; |
| case AlterObjStoreInfoRequest::UNKNOWN: { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "Unknown alter info " + proto_to_json(*request); |
| return; |
| } break; |
| default: |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "Unknown alter obj store info, request info " + proto_to_json(*request); |
| LOG_WARNING("Unknown alter obj store info, request info {}", request->DebugString()); |
| return; |
| } |
| |
| // TODO(dx): check s3 info right |
| |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(alter_obj_store_info) |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| |
| if (instance.status() == InstanceInfoPB::DELETED) { |
| code = MetaServiceCode::CLUSTER_NOT_FOUND; |
| msg = "instance status has been set delete, plz check it"; |
| return; |
| } |
| |
| switch (request->op()) { |
| case AlterObjStoreInfoRequest::ADD_S3_VAULT: { |
| if (!instance.enable_storage_vault()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "Storage vault doesn't support storage vault"; |
| return; |
| } |
| auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info(); |
| if (!obj.has_provider()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "s3 conf lease provider info"; |
| return; |
| } |
| if (instance.obj_info().size() >= 10) { |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = "this instance history has greater than 10 objs, please new another instance"; |
| return; |
| } |
| // ATTN: prefix may be empty |
| if (((ak.empty() || sk.empty()) && role_arn.empty()) || bucket.empty() || |
| endpoint.empty() || region.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "s3 conf info err, please check it"; |
| return; |
| } |
| |
| if (!role_arn.empty()) { |
| if (!obj.has_cred_provider_type() || |
| obj.cred_provider_type() != CredProviderTypePB::INSTANCE_PROFILE || |
| !obj.has_provider() || obj.provider() != ObjectStoreInfoPB::S3) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "s3 conf info err with role_arn, please check it"; |
| return; |
| } |
| } |
| |
| auto& objs = instance.obj_info(); |
| for (auto& it : objs) { |
| if (bucket == it.bucket() && prefix == it.prefix() && endpoint == it.endpoint() && |
| region == it.region() && ak == it.ak() && sk == it.sk() && |
| obj.provider() == it.provider() && external_endpoint == it.external_endpoint()) { |
| // err, anything not changed |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "original obj infos has a same conf, please check it"; |
| return; |
| } |
| } |
| // calc id |
| auto tmp_tuple = ObjectStorageDesc {ak, sk, |
| bucket, prefix, |
| endpoint, external_endpoint, |
| region, use_path_style, |
| role_arn, external_id}; |
| ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj, instance, |
| encryption_info, cipher_ak_sk_pair); |
| if (instance.storage_vault_names().end() != |
| std::find_if(instance.storage_vault_names().begin(), |
| instance.storage_vault_names().end(), |
| [&](const std::string& candidate_name) { |
| return candidate_name == request->vault().name(); |
| })) { |
| code = MetaServiceCode::ALREADY_EXISTED; |
| msg = fmt::format("vault_name={} already created", request->vault().name()); |
| return; |
| } |
| StorageVaultPB vault; |
| vault.set_id(last_item.id()); |
| vault.set_name(request->vault().name()); |
| *instance.mutable_resource_ids()->Add() = vault.id(); |
| *instance.mutable_storage_vault_names()->Add() = vault.name(); |
| vault.mutable_obj_info()->MergeFrom(last_item); |
| auto vault_key = storage_vault_key({instance.instance_id(), last_item.id()}); |
| txn->put(vault_key, vault.SerializeAsString()); |
| if (request->has_set_as_default_storage_vault() && |
| request->set_as_default_storage_vault()) { |
| response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id()); |
| set_default_vault_log_helper(instance, vault.name(), vault.id()); |
| instance.set_default_storage_vault_id(vault.id()); |
| instance.set_default_storage_vault_name(vault.name()); |
| } |
| response->set_storage_vault_id(vault.id()); |
| LOG_INFO("try to put storage vault_id={}, vault_name={}, vault_key={}", vault.id(), |
| vault.name(), hex(vault_key)); |
| } break; |
| case AlterObjStoreInfoRequest::ADD_HDFS_INFO: { |
| if (auto ret = add_vault_into_instance( |
| instance, txn.get(), const_cast<StorageVaultPB&>(request->vault()), code, msg); |
| ret != 0) { |
| return; |
| } |
| if (request->has_set_as_default_storage_vault() && |
| request->set_as_default_storage_vault()) { |
| response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id()); |
| set_default_vault_log_helper(instance, *instance.storage_vault_names().rbegin(), |
| *instance.resource_ids().rbegin()); |
| instance.set_default_storage_vault_id(*instance.resource_ids().rbegin()); |
| instance.set_default_storage_vault_name(*instance.storage_vault_names().rbegin()); |
| } |
| response->set_storage_vault_id(request->vault().id()); |
| break; |
| } |
| case AlterObjStoreInfoRequest::ADD_BUILT_IN_VAULT: { |
| // If the resource ids is empty then it would be the first vault |
| if (!instance.resource_ids().empty()) { |
| std::stringstream ss; |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "Default vault can not be modified"; |
| msg = ss.str(); |
| return; |
| } |
| if (auto ret = add_vault_into_instance( |
| instance, txn.get(), const_cast<StorageVaultPB&>(request->vault()), code, msg); |
| ret != 0) { |
| return; |
| } |
| return; |
| } |
| case AlterObjStoreInfoRequest::DROP_HDFS_INFO: { |
| if (auto ret = remove_hdfs_storage_vault(instance, txn.get(), request->vault(), code, msg); |
| ret != 0) { |
| return; |
| } |
| break; |
| } |
| case AlterObjStoreInfoRequest::SET_DEFAULT_VAULT: { |
| const auto& name = request->vault().name(); |
| auto name_itr = std::find_if(instance.storage_vault_names().begin(), |
| instance.storage_vault_names().end(), |
| [&](const auto& vault_name) { return name == vault_name; }); |
| if (name_itr == instance.storage_vault_names().end()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "invalid storage vault name, name =" << name; |
| msg = ss.str(); |
| return; |
| } |
| auto pos = name_itr - instance.storage_vault_names().begin(); |
| std::string vault_id = instance.resource_ids().begin()[pos]; |
| response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id()); |
| set_default_vault_log_helper(instance, name, vault_id); |
| instance.set_default_storage_vault_id(vault_id); |
| instance.set_default_storage_vault_name(name); |
| response->set_storage_vault_id(vault_id); |
| break; |
| } |
| case AlterObjStoreInfoRequest::UNSET_DEFAULT_VAULT: { |
| LOG_INFO("unset instance's default vault, instance id {}, previous default vault {}, id {}", |
| instance.instance_id(), instance.default_storage_vault_name(), |
| instance.default_storage_vault_id()); |
| instance.clear_default_storage_vault_id(); |
| instance.clear_default_storage_vault_name(); |
| break; |
| } |
| case AlterObjStoreInfoRequest::ALTER_S3_VAULT: { |
| alter_s3_storage_vault(instance, txn, request->vault(), code, msg, response); |
| break; |
| } |
| case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT: { |
| alter_hdfs_storage_vault(instance, txn, request->vault(), code, msg, response); |
| break; |
| } |
| case AlterObjStoreInfoRequest::DROP_S3_VAULT: |
| [[fallthrough]]; |
| default: { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "invalid request op, op=" << request->op(); |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| LOG(INFO) << "instance " << instance_id << " has " << instance.obj_info().size() |
| << " s3 history info, and instance = " << proto_to_json(instance); |
| |
| val = instance.SerializeAsString(); |
| if (val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return; |
| } |
| |
| txn->atomic_add(system_meta_service_instance_update_key(), 1); |
| txn->put(key, val); |
| LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit kv txn, err={}", err); |
| LOG(WARNING) << msg; |
| } |
| } |
| |
| void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* controller, |
| const AlterObjStoreInfoRequest* request, |
| AlterObjStoreInfoResponse* response, |
| ::google::protobuf::Closure* done) { |
| std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region, role_arn, external_id; |
| bool use_path_style; |
| EncryptionInfoPB encryption_info; |
| AkSkPair cipher_ak_sk_pair; |
| RPC_PREPROCESS(alter_obj_store_info, get, put); |
| switch (request->op()) { |
| case AlterObjStoreInfoRequest::ADD_OBJ_INFO: |
| case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK: |
| case AlterObjStoreInfoRequest::ALTER_OBJ_INFO: |
| case AlterObjStoreInfoRequest::UPDATE_AK_SK: { |
| auto tmp_desc = ObjectStorageDesc {ak, sk, |
| bucket, prefix, |
| endpoint, external_endpoint, |
| region, use_path_style, |
| role_arn, external_id}; |
| if (0 != extract_object_storage_info(request, code, msg, tmp_desc, encryption_info, |
| cipher_ak_sk_pair)) { |
| return; |
| } |
| } break; |
| case AlterObjStoreInfoRequest::UNKNOWN: { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "Unknown alter info " + proto_to_json(*request); |
| return; |
| } break; |
| default: |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "Unknown alter obj store info, request info " + proto_to_json(*request); |
| LOG_WARNING("Unknown alter obj store info, request info {}", request->DebugString()); |
| return; |
| } |
| |
| // TODO(dx): check s3 info right |
| |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(alter_obj_store_info) |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| |
| if (instance.status() == InstanceInfoPB::DELETED) { |
| code = MetaServiceCode::CLUSTER_NOT_FOUND; |
| msg = "instance status has been set delete, plz check it"; |
| return; |
| } |
| |
| switch (request->op()) { |
| case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK: |
| case AlterObjStoreInfoRequest::ALTER_OBJ_INFO: { |
| // get id |
| std::string id = request->obj().has_id() ? request->obj().id() : "0"; |
| int idx = std::stoi(id); |
| if (idx < 1 || idx > instance.obj_info().size()) { |
| // err |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "id invalid, please check it"; |
| return; |
| } |
| auto& obj_info = |
| const_cast<std::decay_t<decltype(instance.obj_info())>&>(instance.obj_info()); |
| for (auto& it : obj_info) { |
| if (std::stoi(it.id()) == idx) { |
| if (role_arn.empty()) { |
| if (it.ak() == ak && it.sk() == sk) { |
| // not change, just return ok |
| code = MetaServiceCode::OK; |
| msg = "ak/sk not changed"; |
| return; |
| } |
| it.clear_role_arn(); |
| it.clear_external_id(); |
| it.clear_cred_provider_type(); |
| |
| it.set_ak(ak); |
| it.set_sk(sk); |
| it.mutable_encryption_info()->CopyFrom(encryption_info); |
| } else { |
| if (!ak.empty() || !sk.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "invaild argument, both set ak/sk and role_arn is not allowed"; |
| LOG(INFO) << msg; |
| return; |
| } |
| |
| if (it.provider() != ObjectStoreInfoPB::S3) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "role_arn is only supported for s3 provider"; |
| LOG(INFO) << msg << " provider=" << it.provider(); |
| return; |
| } |
| |
| if (it.role_arn() == role_arn && it.external_id() == external_id) { |
| // not change, just return ok |
| code = MetaServiceCode::OK; |
| msg = "ak/sk not changed"; |
| return; |
| } |
| it.clear_ak(); |
| it.clear_sk(); |
| it.clear_encryption_info(); |
| |
| it.set_role_arn(role_arn); |
| it.set_external_id(external_id); |
| it.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE); |
| } |
| |
| auto now_time = std::chrono::system_clock::now(); |
| uint64_t time = std::chrono::duration_cast<std::chrono::seconds>( |
| now_time.time_since_epoch()) |
| .count(); |
| it.set_mtime(time); |
| } |
| } |
| } break; |
| case AlterObjStoreInfoRequest::ADD_OBJ_INFO: { |
| if (instance.enable_storage_vault()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "Storage vault doesn't support add obj info"; |
| return; |
| } |
| auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info(); |
| if (!obj.has_provider()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "s3 conf lease provider info"; |
| return; |
| } |
| if (instance.obj_info().size() >= 10) { |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = "this instance history has greater than 10 objs, please new another instance"; |
| return; |
| } |
| // ATTN: prefix may be empty |
| if (((ak.empty() || sk.empty()) && role_arn.empty()) || bucket.empty() || |
| endpoint.empty() || region.empty() || prefix.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "s3 conf info err, please check it"; |
| return; |
| } |
| |
| auto& objs = instance.obj_info(); |
| for (auto& it : objs) { |
| if (bucket == it.bucket() && prefix == it.prefix() && endpoint == it.endpoint() && |
| region == it.region() && ak == it.ak() && sk == it.sk() && |
| obj.provider() == it.provider() && external_endpoint == it.external_endpoint()) { |
| // err, anything not changed |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "original obj infos has a same conf, please check it"; |
| return; |
| } |
| } |
| // calc id |
| auto tmp_tuple = ObjectStorageDesc {ak, sk, |
| bucket, prefix, |
| endpoint, external_endpoint, |
| region, use_path_style, |
| role_arn, external_id}; |
| ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj, instance, |
| encryption_info, cipher_ak_sk_pair); |
| instance.add_obj_info()->CopyFrom(last_item); |
| LOG_INFO("Instance {} tries to put obj info", instance.instance_id()); |
| } break; |
| default: { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "invalid request op, op=" << request->op(); |
| msg = ss.str(); |
| return; |
| } |
| } |
| |
| LOG(INFO) << "instance " << instance_id << " has " << instance.obj_info().size() |
| << " s3 history info, and instance = " << proto_to_json(instance); |
| |
| val = instance.SerializeAsString(); |
| if (val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return; |
| } |
| |
| txn->atomic_add(system_meta_service_instance_update_key(), 1); |
| txn->put(key, val); |
| LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit kv txn, err={}", err); |
| LOG(WARNING) << msg; |
| } |
| } |
| |
| void MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller, |
| const UpdateAkSkRequest* request, UpdateAkSkResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(update_ak_sk, get, put); |
| instance_id = request->has_instance_id() ? request->instance_id() : ""; |
| if (instance_id.empty()) { |
| msg = "instance id not set"; |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| return; |
| } |
| if (!request->has_ram_user() && request->internal_bucket_user().empty()) { |
| msg = "nothing to update"; |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| return; |
| } |
| RPC_RATE_LIMIT(update_ak_sk) |
| |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| |
| if (instance.status() == InstanceInfoPB::DELETED) { |
| code = MetaServiceCode::CLUSTER_NOT_FOUND; |
| msg = "instance status has been set delete, plz check it"; |
| return; |
| } |
| |
| auto now_time = std::chrono::system_clock::now(); |
| uint64_t time = |
| std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count(); |
| |
| std::stringstream update_record; |
| |
| // Update instance using helper function |
| if (update_instance_ak_sk(instance, request, time, code, msg, update_record) != 0) { |
| return; |
| } |
| |
| LOG(INFO) << "instance " << instance_id << " has " << instance.obj_info().size() |
| << " s3 history info, and " << instance.resource_ids_size() << " vaults " |
| << "instance = " << proto_to_json(instance); |
| |
| val = instance.SerializeAsString(); |
| if (val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return; |
| } |
| |
| txn->atomic_add(system_meta_service_instance_update_key(), 1); |
| txn->put(key, val); |
| LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key); |
| |
| // Commit root instance first to avoid transaction timeout |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit root instance kv txn, err={}", err); |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| LOG(INFO) << update_record.str(); |
| async_notify_refresh_instance(txn_kv_, instance_id, true); |
| |
| // Cascade update to derived instances using separate transactions |
| // update_ak_sk is idempotent, so it's safe to use independent transactions |
| if (!instance.has_snapshot_switch_status() || |
| instance.snapshot_switch_status() == SNAPSHOT_SWITCH_DISABLED) { |
| LOG(INFO) << "snapshot disabled for instance_id=" << instance_id |
| << ", skip cascade updating derived instances"; |
| return; |
| } |
| |
| std::vector<std::string> cascade_instance_ids; |
| if (find_cascade_instances(txn_kv_.get(), instance_id, &cascade_instance_ids) != 0) { |
| LOG(WARNING) << "failed to find derived instances for cascade update, instance_id=" |
| << instance_id << ", root instance already updated successfully"; |
| return; |
| } |
| |
| std::string cascade_ids_str; |
| for (size_t i = 0; i < cascade_instance_ids.size(); ++i) { |
| if (i > 0) cascade_ids_str += ", "; |
| cascade_ids_str += cascade_instance_ids[i]; |
| } |
| LOG(INFO) << "Found " << cascade_instance_ids.size() |
| << " derived instances to cascade update: [" << cascade_ids_str << "]"; |
| |
| for (const auto& cascade_id : cascade_instance_ids) { |
| // Use a separate transaction for each derived instance |
| std::unique_ptr<Transaction> cascade_txn; |
| TxnErrorCode cascade_err = txn_kv_->create_txn(&cascade_txn); |
| if (cascade_err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to create txn for derived instance, instance_id=" << cascade_id |
| << " err=" << cascade_err; |
| continue; |
| } |
| |
| InstanceKeyInfo cascade_key_info {cascade_id}; |
| std::string cascade_key; |
| std::string cascade_val; |
| instance_key(cascade_key_info, &cascade_key); |
| |
| cascade_err = cascade_txn->get(cascade_key, &cascade_val); |
| if (cascade_err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to get derived instance, instance_id=" << cascade_id |
| << " err=" << cascade_err; |
| continue; |
| } |
| |
| InstanceInfoPB cascade_instance; |
| if (!cascade_instance.ParseFromString(cascade_val)) { |
| LOG(WARNING) << "failed to parse InstanceInfoPB for derived instance_id=" << cascade_id; |
| continue; |
| } |
| |
| // Update the cascade instance using helper function |
| std::stringstream cascade_update_record; |
| MetaServiceCode cascade_code = MetaServiceCode::OK; |
| std::string cascade_msg; |
| if (update_instance_ak_sk(cascade_instance, request, time, cascade_code, cascade_msg, |
| cascade_update_record) != 0) { |
| LOG(WARNING) << "failed to update derived instance, instance_id=" << cascade_id |
| << " msg=" << cascade_msg; |
| continue; |
| } |
| |
| cascade_val = cascade_instance.SerializeAsString(); |
| if (cascade_val.empty()) { |
| LOG(WARNING) << "failed to serialize derived instance, instance_id=" << cascade_id; |
| continue; |
| } |
| |
| cascade_txn->put(cascade_key, cascade_val); |
| |
| cascade_err = cascade_txn->commit(); |
| if (cascade_err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to commit derived instance txn, instance_id=" << cascade_id |
| << " err=" << cascade_err; |
| continue; |
| } |
| |
| async_notify_refresh_instance(txn_kv_, cascade_id, true); |
| LOG(INFO) << "cascade update for instance_id=" << cascade_id << " " |
| << cascade_update_record.str(); |
| } |
| } |
| |
| void MetaServiceImpl::create_instance(google::protobuf::RpcController* controller, |
| const CreateInstanceRequest* request, |
| CreateInstanceResponse* response, |
| ::google::protobuf::Closure* done) { |
| TEST_SYNC_POINT_CALLBACK("create_instance_sk_request", |
| const_cast<CreateInstanceRequest**>(&request)); |
| RPC_PREPROCESS(create_instance, get, put); |
| TEST_SYNC_POINT_RETURN_WITH_VOID("create_instance_sk_request_return"); |
| if (request->has_ram_user()) { |
| auto& ram_user = request->ram_user(); |
| std::string ram_user_id = ram_user.has_user_id() ? ram_user.user_id() : ""; |
| std::string ram_user_ak = ram_user.has_ak() ? ram_user.ak() : ""; |
| std::string ram_user_sk = ram_user.has_sk() ? ram_user.sk() : ""; |
| if (ram_user_id.empty() || ram_user_ak.empty() || ram_user_sk.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "ram user info err, please check it"; |
| return; |
| } |
| } |
| instance_id = request->instance_id(); |
| // Prepare data |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| instance.set_user_id(request->has_user_id() ? request->user_id() : ""); |
| instance.set_name(request->has_name() ? request->name() : ""); |
| instance.set_status(InstanceInfoPB::NORMAL); |
| instance.set_sse_enabled(request->sse_enabled()); |
| instance.set_enable_storage_vault(!request->has_obj_info()); |
| if (request->has_obj_info()) { |
| create_object_info_with_encrypt(instance, |
| const_cast<ObjectStoreInfoPB*>(&request->obj_info()), |
| request->sse_enabled(), code, msg); |
| if (code != MetaServiceCode::OK) { |
| return; |
| } |
| instance.mutable_obj_info()->Add()->MergeFrom(request->obj_info()); |
| } |
| if (request->has_ram_user()) { |
| auto& ram_user = request->ram_user(); |
| EncryptionInfoPB encryption_info; |
| AkSkPair cipher_ak_sk_pair; |
| if (encrypt_ak_sk_helper(ram_user.ak(), ram_user.sk(), &encryption_info, &cipher_ak_sk_pair, |
| code, msg) != 0) { |
| return; |
| } |
| RamUserPB new_ram_user; |
| new_ram_user.CopyFrom(ram_user); |
| new_ram_user.set_ak(std::move(cipher_ak_sk_pair.first)); |
| new_ram_user.set_sk(std::move(cipher_ak_sk_pair.second)); |
| new_ram_user.mutable_encryption_info()->CopyFrom(encryption_info); |
| instance.mutable_ram_user()->CopyFrom(new_ram_user); |
| } |
| if (config::enable_multi_version_status) { |
| instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_READ_WRITE); |
| instance.set_snapshot_switch_status(SNAPSHOT_SWITCH_OFF); |
| if (config::enable_cluster_snapshot) { |
| instance.set_snapshot_switch_status(SNAPSHOT_SWITCH_ON); |
| instance.set_snapshot_interval_seconds(config::snapshot_min_interval_seconds); |
| instance.set_max_reserved_snapshot(1); |
| } |
| } |
| |
| if (instance.instance_id().empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "instance id not set"; |
| return; |
| } |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| if (request->has_vault()) { |
| auto& param = const_cast<StorageVaultPB&>(request->vault()); |
| param.set_name(BUILT_IN_STORAGE_VAULT_NAME.data()); |
| if (0 != add_vault_into_instance(instance, txn.get(), param, code, msg)) { |
| return; |
| } |
| } |
| |
| InstanceKeyInfo key_info {request->instance_id()}; |
| std::string key; |
| std::string val = instance.SerializeAsString(); |
| instance_key(key_info, &key); |
| if (val.empty()) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| msg = "failed to serialize"; |
| LOG(ERROR) << msg; |
| return; |
| } |
| |
| for (auto& obj_info : *instance.mutable_obj_info()) { |
| obj_info.set_ak(hide_access_key(obj_info.ak())); |
| } |
| |
| LOG(INFO) << "xxx instance json=" << proto_to_json(instance); |
| |
| // Check existence before proceeding |
| err = txn->get(key, &val); |
| if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| std::stringstream ss; |
| ss << (err == TxnErrorCode::TXN_OK ? "instance already existed" |
| : "internal error failed to check instance") |
| << ", instance_id=" << request->instance_id() << ", err=" << err; |
| code = err == TxnErrorCode::TXN_OK ? MetaServiceCode::ALREADY_EXISTED |
| : cast_as<ErrCategory::READ>(err); |
| msg = ss.str(); |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| |
| txn->atomic_add(system_meta_service_instance_update_key(), 1); |
| txn->put(key, val); |
| LOG(INFO) << "put instance_id=" << request->instance_id() << " instance_key=" << hex(key); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit kv txn, err={}", err); |
| LOG(WARNING) << msg; |
| } |
| |
| async_notify_refresh_instance( |
| txn_kv_, request->instance_id(), /*include_self=*/true, |
| [instance_id = request->instance_id()](const KVStats& stats) { |
| if (config::use_detailed_metrics && !instance_id.empty()) { |
| g_bvar_rpc_kv_create_instance_get_bytes.put({instance_id}, stats.get_bytes); |
| g_bvar_rpc_kv_create_instance_get_counter.put({instance_id}, stats.get_counter); |
| } |
| }); |
| } |
| |
| std::pair<MetaServiceCode, std::string> handle_snapshot_switch(const std::string& instance_id, |
| const std::string& value, |
| InstanceInfoPB* instance) { |
| const std::string& property_name = |
| AlterInstanceRequest::SnapshotProperty_Name(AlterInstanceRequest::ENABLE_SNAPSHOT); |
| if (value != "true" && value != "false") { |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, |
| "Invalid value for " + property_name + " property: " + value + |
| ", expected 'true' or 'false'" + |
| ", instance_id: " + instance_id); |
| } |
| |
| // Check if snapshot is not ready (UNSUPPORTED state) |
| if (!instance->has_snapshot_switch_status() || |
| instance->snapshot_switch_status() == SNAPSHOT_SWITCH_DISABLED) { |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, |
| "Snapshot is not ready, instance_id: " + instance_id); |
| } else if (value == "false" && instance->snapshot_switch_status() == SNAPSHOT_SWITCH_OFF) { |
| return std::make_pair( |
| MetaServiceCode::INVALID_ARGUMENT, |
| "Snapshot is already set to SNAPSHOT_SWITCH_OFF, instance_id: " + instance_id); |
| } else if (value == "true" && instance->multi_version_status() != MULTI_VERSION_READ_WRITE) { |
| // If the multi_version_status is not READ_WRITE, cannot enable snapshot because the |
| // operation logs will be recycled directly since min_versionstamp is not set when multi |
| // version status is WRITE_ONLY |
| std::string url = |
| "${MS_ENDPOINT}/MetaService/http/" |
| "set_multi_version_status?multi_version_status=MULTI_VERSION_READ_WRITE"; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, |
| fmt::format("Cannot enable snapshot when multi_version_status is not " |
| "MULTI_VERSION_READ_WRITE. Consider enabling " |
| "MULTI_VERSION_READ_WRITE status by curl {}", |
| instance_id)); |
| } else if (value == "true") { |
| instance->set_snapshot_switch_status(SNAPSHOT_SWITCH_ON); |
| |
| // Set default values when first enabling snapshot |
| if (!instance->has_snapshot_interval_seconds() || |
| instance->snapshot_interval_seconds() == 0) { |
| instance->set_snapshot_interval_seconds(config::snapshot_min_interval_seconds); |
| LOG(INFO) << "Set default snapshot_interval_seconds to " |
| << config::snapshot_min_interval_seconds << " for instance " << instance_id; |
| } |
| if (!instance->has_max_reserved_snapshot()) { |
| // Disable auto snapshot by default |
| instance->set_max_reserved_snapshot(0); |
| LOG(INFO) << "Set default max_reserved_snapshots to 0 for instance " << instance_id; |
| } |
| } else { |
| instance->set_snapshot_switch_status(SNAPSHOT_SWITCH_OFF); |
| } |
| |
| LOG(INFO) << "Set snapshot " + property_name + " to " + value + " for instance " + instance_id; |
| |
| return std::make_pair(MetaServiceCode::OK, ""); |
| } |
| |
| std::pair<MetaServiceCode, std::string> handle_max_reserved_snapshots( |
| const std::string& instance_id, const std::string& value, InstanceInfoPB* instance) { |
| const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( |
| AlterInstanceRequest::MAX_RESERVED_SNAPSHOTS); |
| |
| int max_snapshots; |
| try { |
| max_snapshots = std::stoi(value); |
| if (max_snapshots < 0) { |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, |
| property_name + " must be non-negative, got: " + value); |
| } |
| if (max_snapshots > config::snapshot_max_reserved_num) { |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, |
| property_name + " too large, maximum is " + |
| std::to_string(config::snapshot_max_reserved_num) + |
| ", got: " + value); |
| } |
| } catch (const std::exception& e) { |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, |
| "Invalid numeric value for max_reserved_snapshots: " + value); |
| } |
| |
| instance->set_max_reserved_snapshot(max_snapshots); |
| |
| LOG(INFO) << "Set " + property_name + " to " + value + " for instance " + instance_id; |
| |
| return std::make_pair(MetaServiceCode::OK, ""); |
| } |
| |
| std::pair<MetaServiceCode, std::string> handle_snapshot_intervals(const std::string& instance_id, |
| const std::string& value, |
| InstanceInfoPB* instance) { |
| const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( |
| AlterInstanceRequest::SNAPSHOT_INTERVAL_SECONDS); |
| |
| int intervals; |
| try { |
| intervals = std::stoi(value); |
| if (intervals < config::snapshot_min_interval_seconds) { |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, |
| property_name + " too small, minimum is " + |
| std::to_string(config::snapshot_min_interval_seconds) + |
| " seconds, got: " + value); |
| } |
| } catch (const std::exception& e) { |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, |
| "Invalid numeric value for " + property_name + ": " + value); |
| } |
| |
| instance->set_snapshot_interval_seconds(intervals); |
| |
| LOG(INFO) << "Set " + property_name + " to " + value + " seconds for instance " + instance_id; |
| |
| return std::make_pair(MetaServiceCode::OK, ""); |
| } |
| |
| void MetaServiceImpl::alter_instance(google::protobuf::RpcController* controller, |
| const AlterInstanceRequest* request, |
| AlterInstanceResponse* response, |
| ::google::protobuf::Closure* done) { |
| StopWatch sw; |
| auto ctrl = static_cast<brpc::Controller*>(controller); |
| LOG(INFO) << __PRETTY_FUNCTION__ << " rpc from " << ctrl->remote_side() |
| << " request=" << request->ShortDebugString(); |
| brpc::ClosureGuard closure_guard(done); |
| MetaServiceCode code = MetaServiceCode::OK; |
| std::string msg = "OK"; |
| [[maybe_unused]] std::stringstream ss; |
| std::string instance_id = request->has_instance_id() ? request->instance_id() : ""; |
| DORIS_CLOUD_DEFER { |
| response->mutable_status()->set_code(code); |
| response->mutable_status()->set_msg(msg); |
| LOG(INFO) << (code == MetaServiceCode::OK ? "succ to " : "failed to ") |
| << __PRETTY_FUNCTION__ << " " << ctrl->remote_side() << " " << msg; |
| closure_guard.reset(nullptr); |
| if (config::use_detailed_metrics && !instance_id.empty()) { |
| g_bvar_ms_alter_instance.put(instance_id, sw.elapsed_us()); |
| } |
| }; |
| |
| std::pair<MetaServiceCode, std::string> ret; |
| switch (request->op()) { |
| case AlterInstanceRequest::DROP: { |
| ret = alter_instance( |
| request, [&request, &instance_id](Transaction* txn, InstanceInfoPB* instance) { |
| std::string msg; |
| // check instance doesn't have any cluster. |
| if (instance->clusters_size() != 0) { |
| msg = "failed to drop instance, instance has clusters"; |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| |
| // check instance doesn't have any snapshot. |
| MetaReader meta_reader(instance_id); |
| std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots; |
| TxnErrorCode err = meta_reader.get_snapshots(txn, &snapshots); |
| if (err != TxnErrorCode::TXN_OK) { |
| msg = "failed to get snapshots"; |
| LOG(WARNING) << msg << " err=" << err; |
| return std::make_pair(cast_as<ErrCategory::READ>(err), msg); |
| } |
| for (auto& [snapshot, _] : snapshots) { |
| if (snapshot.status() != SnapshotStatus::SNAPSHOT_RECYCLED) { |
| // still has snapshots, cannot drop |
| msg = "failed to drop instance, instance has snapshots"; |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| } |
| |
| instance->set_status(InstanceInfoPB::DELETED); |
| instance->set_mtime( |
| duration_cast<seconds>(system_clock::now().time_since_epoch()).count()); |
| |
| std::string ret = instance->SerializeAsString(); |
| if (ret.empty()) { |
| msg = "failed to serialize"; |
| LOG(ERROR) << msg; |
| return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg); |
| } |
| LOG(INFO) << "put instance_id=" << request->instance_id() |
| << "drop instance json=" << proto_to_json(*instance); |
| |
| if (instance->has_source_instance_id() && instance->has_source_snapshot_id() && |
| !instance->source_instance_id().empty() && |
| !instance->source_snapshot_id().empty()) { |
| Versionstamp snapshot_versionstamp; |
| if (!SnapshotManager::parse_snapshot_versionstamp( |
| instance->source_snapshot_id(), &snapshot_versionstamp)) { |
| msg = "failed to parse snapshot_id to versionstamp, snapshot_id=" + |
| instance->source_snapshot_id(); |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| versioned::SnapshotReferenceKeyInfo ref_key_info { |
| instance->source_instance_id(), snapshot_versionstamp, instance_id}; |
| std::string reference_key = versioned::snapshot_reference_key(ref_key_info); |
| txn->remove(reference_key); |
| } |
| return std::make_pair(MetaServiceCode::OK, ret); |
| }); |
| } break; |
| case AlterInstanceRequest::RENAME: { |
| ret = alter_instance(request, [&request](Transaction* txn, InstanceInfoPB* instance) { |
| std::string msg; |
| std::string name = request->has_name() ? request->name() : ""; |
| if (name.empty()) { |
| msg = "rename instance name, but not set"; |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| instance->set_name(name); |
| |
| std::string ret = instance->SerializeAsString(); |
| if (ret.empty()) { |
| msg = "failed to serialize"; |
| LOG(ERROR) << msg; |
| return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg); |
| } |
| LOG(INFO) << "put instance_id=" << request->instance_id() |
| << "rename instance json=" << proto_to_json(*instance); |
| return std::make_pair(MetaServiceCode::OK, ret); |
| }); |
| } break; |
| case AlterInstanceRequest::ENABLE_SSE: { |
| ret = alter_instance(request, [&request](Transaction* txn, InstanceInfoPB* instance) { |
| std::string msg; |
| if (instance->sse_enabled()) { |
| msg = "failed to enable sse, instance has enabled sse"; |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| instance->set_sse_enabled(true); |
| instance->set_mtime( |
| duration_cast<seconds>(system_clock::now().time_since_epoch()).count()); |
| |
| for (auto& obj_info : *(instance->mutable_obj_info())) { |
| obj_info.set_sse_enabled(true); |
| } |
| std::string ret = instance->SerializeAsString(); |
| if (ret.empty()) { |
| msg = "failed to serialize"; |
| LOG(ERROR) << msg; |
| return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg); |
| } |
| LOG(INFO) << "put instance_id=" << request->instance_id() |
| << "instance enable sse json=" << proto_to_json(*instance); |
| return std::make_pair(MetaServiceCode::OK, ret); |
| }); |
| } break; |
| case AlterInstanceRequest::DISABLE_SSE: { |
| ret = alter_instance(request, [&request](Transaction* txn, InstanceInfoPB* instance) { |
| std::string msg; |
| if (!instance->sse_enabled()) { |
| msg = "failed to disable sse, instance has disabled sse"; |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| instance->set_sse_enabled(false); |
| instance->set_mtime( |
| duration_cast<seconds>(system_clock::now().time_since_epoch()).count()); |
| |
| for (auto& obj_info : *(instance->mutable_obj_info())) { |
| obj_info.set_sse_enabled(false); |
| } |
| std::string ret = instance->SerializeAsString(); |
| if (ret.empty()) { |
| msg = "failed to serialize"; |
| LOG(ERROR) << msg; |
| return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg); |
| } |
| LOG(INFO) << "put instance_id=" << request->instance_id() |
| << "instance disable sse json=" << proto_to_json(*instance); |
| return std::make_pair(MetaServiceCode::OK, ret); |
| }); |
| } break; |
| case AlterInstanceRequest::REFRESH: { |
| ret = resource_mgr_->refresh_instance(request->instance_id()); |
| } break; |
| case AlterInstanceRequest::SET_OVERDUE: { |
| ret = alter_instance(request, [&request](Transaction* txn, InstanceInfoPB* instance) { |
| std::string msg; |
| |
| if (instance->status() == InstanceInfoPB::DELETED) { |
| msg = "can't set deleted instance to overdue, instance_id = " + |
| request->instance_id(); |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| if (instance->status() == InstanceInfoPB::OVERDUE) { |
| msg = "the instance has already set instance to overdue, instance_id = " + |
| request->instance_id(); |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| instance->set_status(InstanceInfoPB::OVERDUE); |
| instance->set_mtime( |
| duration_cast<seconds>(system_clock::now().time_since_epoch()).count()); |
| |
| std::string ret = instance->SerializeAsString(); |
| if (ret.empty()) { |
| msg = "failed to serialize"; |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg); |
| } |
| LOG(INFO) << "put instance_id=" << request->instance_id() |
| << "set instance overdue json=" << proto_to_json(*instance); |
| return std::make_pair(MetaServiceCode::OK, ret); |
| }); |
| } break; |
| case AlterInstanceRequest::SET_NORMAL: { |
| ret = alter_instance(request, [&request](Transaction* txn, InstanceInfoPB* instance) { |
| std::string msg; |
| |
| if (instance->status() == InstanceInfoPB::DELETED) { |
| msg = "can't set deleted instance to normal, instance_id = " + |
| request->instance_id(); |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| if (instance->status() == InstanceInfoPB::NORMAL) { |
| msg = "the instance is already normal, instance_id = " + request->instance_id(); |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| instance->set_status(InstanceInfoPB::NORMAL); |
| instance->set_mtime( |
| duration_cast<seconds>(system_clock::now().time_since_epoch()).count()); |
| |
| std::string ret = instance->SerializeAsString(); |
| if (ret.empty()) { |
| msg = "failed to serialize"; |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg); |
| } |
| LOG(INFO) << "put instance_id=" << request->instance_id() |
| << "set instance normal json=" << proto_to_json(*instance); |
| return std::make_pair(MetaServiceCode::OK, ret); |
| }); |
| } break; |
| /** |
| * Handle SET_SNAPSHOT_PROPERTY operation - configures snapshot-related properties for an instance. |
| * |
| * Supported property keys and their expected values: |
| * - "ENABLE_SNAPSHOT": "true" | "false" |
| * Controls whether snapshot functionality is enabled for the instance |
| * |
| * - "MAX_RESERVED_SNAPSHOTS": numeric string (0-config::snapshot_max_reserved_num) |
| * Sets the maximum number of snapshots to retain for the instance |
| * |
| * - "SNAPSHOT_INTERVAL_SECONDS": numeric string (config::snapshot_min_interval_seconds-max) |
| * Sets the snapshot creation interval in seconds (minimum controlled by config) |
| * |
| * Each property is validated by its respective handler function which ensures |
| * the provided values conform to the expected format and constraints. |
| */ |
| case AlterInstanceRequest::SET_SNAPSHOT_PROPERTY: { |
| ret = alter_instance(request, [&request, &instance_id](Transaction* txn, |
| InstanceInfoPB* instance) { |
| std::string msg; |
| auto properties = request->properties(); |
| if (properties.empty()) { |
| msg = "snapshot properties is empty, instance_id = " + request->instance_id(); |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| for (const auto& property : properties) { |
| std::string key = property.first; |
| std::string value = property.second; |
| AlterInstanceRequest::SnapshotProperty snapshot_property = |
| AlterInstanceRequest::UNKNOWN_PROPERTY; |
| if (!AlterInstanceRequest_SnapshotProperty_Parse(key, &snapshot_property) || |
| snapshot_property == AlterInstanceRequest::UNKNOWN_PROPERTY) { |
| msg = "unknown snapshot property: " + key; |
| LOG(WARNING) << "alter instance failed: " << msg |
| << ", instance_id = " << instance_id; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| |
| std::pair<MetaServiceCode, std::string> result; |
| |
| if (snapshot_property == AlterInstanceRequest::ENABLE_SNAPSHOT) { |
| result = handle_snapshot_switch(request->instance_id(), value, instance); |
| } else if (snapshot_property == AlterInstanceRequest::MAX_RESERVED_SNAPSHOTS) { |
| result = handle_max_reserved_snapshots(request->instance_id(), value, instance); |
| } else if (snapshot_property == AlterInstanceRequest::SNAPSHOT_INTERVAL_SECONDS) { |
| result = handle_snapshot_intervals(request->instance_id(), value, instance); |
| } else { |
| msg = "unsupported property: " + key; |
| LOG(WARNING) << "alter instance failed: " << msg |
| << ", instance_id = " << instance_id; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| |
| if (result.first != MetaServiceCode::OK) { |
| return result; |
| } |
| } |
| |
| std::string ret = instance->SerializeAsString(); |
| if (ret.empty()) { |
| msg = "failed to serialize"; |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg); |
| } |
| LOG(INFO) << "put instance_id=" << request->instance_id() |
| << "set instance snapshot property json=" << proto_to_json(*instance); |
| return std::make_pair(MetaServiceCode::OK, ret); |
| }); |
| } break; |
| default: { |
| ss << "invalid request op, op=" << request->op(); |
| ret = std::make_pair(MetaServiceCode::INVALID_ARGUMENT, ss.str()); |
| } |
| } |
| code = ret.first; |
| msg = ret.second; |
| |
| if (request->op() == AlterInstanceRequest::REFRESH) return; |
| |
| async_notify_refresh_instance(txn_kv_, request->instance_id(), /*include_self=*/false); |
| } |
| |
| void MetaServiceImpl::get_instance(google::protobuf::RpcController* controller, |
| const GetInstanceRequest* request, GetInstanceResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_instance, get); |
| TEST_SYNC_POINT_CALLBACK("get_instance_sk_response", &response); |
| TEST_SYNC_POINT_RETURN_WITH_VOID("get_instance_sk_response_return"); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud_unique_id must be given"; |
| return; |
| } |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(get_instance); |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| |
| response->mutable_instance()->CopyFrom(instance); |
| } |
| |
| std::pair<MetaServiceCode, std::string> MetaServiceImpl::alter_instance( |
| const cloud::AlterInstanceRequest* request, |
| std::function<std::pair<MetaServiceCode, std::string>(Transaction*, InstanceInfoPB*)> |
| action) { |
| MetaServiceCode code = MetaServiceCode::OK; |
| std::string msg = "OK"; |
| std::string instance_id = request->has_instance_id() ? request->instance_id() : ""; |
| if (instance_id.empty()) { |
| msg = "instance id not set"; |
| LOG(WARNING) << msg; |
| return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); |
| } |
| |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return std::make_pair(cast_as<ErrCategory::CREATE>(err), msg); |
| } |
| |
| // Check existence before proceeding |
| err = txn->get(key, &val); |
| if (err != TxnErrorCode::TXN_OK) { |
| std::stringstream ss; |
| ss << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "instance not existed" |
| : "internal error failed to check instance") |
| << ", instance_id=" << request->instance_id() << ", err=" << err; |
| // TODO(dx): fix CLUSTER_NOT_FOUND,VERSION_NOT_FOUND,TXN_LABEL_NOT_FOUND,etc to NOT_FOUND |
| code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::CLUSTER_NOT_FOUND |
| : cast_as<ErrCategory::READ>(err); |
| msg = ss.str(); |
| LOG(WARNING) << msg << " err=" << err; |
| return std::make_pair(code, msg); |
| } |
| LOG(INFO) << "alter instance key=" << hex(key); |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| msg = "failed to parse InstanceInfoPB"; |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| LOG(WARNING) << msg; |
| return std::make_pair(code, msg); |
| } |
| auto r = action(txn.get(), &instance); |
| if (r.first != MetaServiceCode::OK) { |
| return r; |
| } |
| val = r.second; |
| txn->atomic_add(system_meta_service_instance_update_key(), 1); |
| txn->put(key, val); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit kv txn, err={}", err); |
| LOG(WARNING) << msg; |
| return std::make_pair(code, msg); |
| } |
| return std::make_pair(code, msg); |
| } |
| |
| void handle_add_cluster(const std::string& instance_id, const ClusterInfo& cluster, |
| std::shared_ptr<ResourceManager> resource_mgr, std::string& msg, |
| MetaServiceCode& code) { |
| auto r = resource_mgr->add_cluster(instance_id, cluster); |
| code = r.first; |
| msg = r.second; |
| } |
| |
| void handle_drop_cluster(const std::string& instance_id, const ClusterInfo& cluster, |
| std::shared_ptr<ResourceManager> resource_mgr, std::string& msg, |
| MetaServiceCode& code) { |
| auto r = resource_mgr->drop_cluster(instance_id, cluster); |
| code = r.first; |
| msg = r.second; |
| } |
| |
| void handle_update_cluster_mySQL_username(const std::string& instance_id, |
| const ClusterInfo& cluster, |
| std::shared_ptr<ResourceManager> resource_mgr, |
| std::string& msg, MetaServiceCode& code) { |
| msg = resource_mgr->update_cluster( |
| instance_id, cluster, |
| [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); }, |
| [&](ClusterPB& c, std::vector<ClusterPB>&) { |
| auto& mysql_user_names = cluster.cluster.mysql_user_name(); |
| c.mutable_mysql_user_name()->CopyFrom(mysql_user_names); |
| return ""; |
| }); |
| } |
| |
| void handle_add_node(const std::string& instance_id, const AlterClusterRequest* request, |
| std::shared_ptr<ResourceManager> resource_mgr, std::string& msg, |
| MetaServiceCode& code) { |
| resource_mgr->check_cluster_params_valid(request->cluster(), &msg, false, false); |
| if (!msg.empty()) { |
| LOG(WARNING) << msg; |
| return; |
| } |
| std::vector<NodeInfo> to_add; |
| std::vector<NodeInfo> to_del; |
| for (auto& n : request->cluster().nodes()) { |
| NodeInfo node; |
| node.instance_id = request->instance_id(); |
| node.node_info = n; |
| node.cluster_id = request->cluster().cluster_id(); |
| node.cluster_name = request->cluster().cluster_name(); |
| node.role = (request->cluster().type() == ClusterPB::SQL |
| ? Role::SQL_SERVER |
| : (request->cluster().type() == ClusterPB::COMPUTE ? Role::COMPUTE_NODE |
| : Role::UNDEFINED)); |
| node.node_info.set_status(NodeStatusPB::NODE_STATUS_RUNNING); |
| to_add.emplace_back(std::move(node)); |
| } |
| msg = resource_mgr->modify_nodes(instance_id, to_add, to_del); |
| } |
| |
| void handle_drop_node(const std::string& instance_id, const AlterClusterRequest* request, |
| std::shared_ptr<ResourceManager> resource_mgr, std::string& msg, |
| MetaServiceCode& code) { |
| resource_mgr->check_cluster_params_valid(request->cluster(), &msg, false, false); |
| if (!msg.empty()) { |
| LOG(WARNING) << msg; |
| return; |
| } |
| std::vector<NodeInfo> to_add; |
| std::vector<NodeInfo> to_del; |
| for (auto& n : request->cluster().nodes()) { |
| NodeInfo node; |
| node.instance_id = request->instance_id(); |
| node.node_info = n; |
| node.cluster_id = request->cluster().cluster_id(); |
| node.cluster_name = request->cluster().cluster_name(); |
| node.role = (request->cluster().type() == ClusterPB::SQL |
| ? Role::SQL_SERVER |
| : (request->cluster().type() == ClusterPB::COMPUTE ? Role::COMPUTE_NODE |
| : Role::UNDEFINED)); |
| to_del.emplace_back(std::move(node)); |
| } |
| msg = resource_mgr->modify_nodes(instance_id, to_add, to_del); |
| } |
| |
| void handle_decommission_node(const std::string& instance_id, const AlterClusterRequest* request, |
| std::shared_ptr<ResourceManager> resource_mgr, std::string& msg, |
| MetaServiceCode& code) { |
| resource_mgr->check_cluster_params_valid(request->cluster(), &msg, false, false); |
| if (msg != "") { |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| std::string be_unique_id = (request->cluster().nodes())[0].cloud_unique_id(); |
| std::vector<NodeInfo> nodes; |
| std::string err = resource_mgr->get_node(be_unique_id, &nodes); |
| if (!err.empty()) { |
| LOG(INFO) << "failed to check instance info, err=" << err; |
| msg = err; |
| return; |
| } |
| |
| std::vector<NodeInfo> decomission_nodes; |
| for (auto& node : nodes) { |
| for (auto req_node : request->cluster().nodes()) { |
| bool ip_processed = false; |
| if (node.node_info.has_ip() && req_node.has_ip()) { |
| std::string endpoint = |
| node.node_info.ip() + ":" + std::to_string(node.node_info.heartbeat_port()); |
| std::string req_endpoint = |
| req_node.ip() + ":" + std::to_string(req_node.heartbeat_port()); |
| if (endpoint == req_endpoint) { |
| decomission_nodes.push_back(node); |
| node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING); |
| } |
| ip_processed = true; |
| } |
| |
| if (!ip_processed && node.node_info.has_host() && req_node.has_host()) { |
| std::string endpoint = node.node_info.host() + ":" + |
| std::to_string(node.node_info.heartbeat_port()); |
| std::string req_endpoint = |
| req_node.host() + ":" + std::to_string(req_node.heartbeat_port()); |
| if (endpoint == req_endpoint) { |
| decomission_nodes.push_back(node); |
| node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING); |
| } |
| } |
| } |
| } |
| |
| { |
| std::vector<NodeInfo> to_add; |
| std::vector<NodeInfo>& to_del = decomission_nodes; |
| msg = resource_mgr->modify_nodes(instance_id, to_add, to_del); |
| } |
| { |
| std::vector<NodeInfo>& to_add = decomission_nodes; |
| std::vector<NodeInfo> to_del; |
| for (auto& node : to_add) { |
| node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING); |
| LOG(INFO) << "decomission node, " |
| << "size: " << to_add.size() << " " << node.node_info.DebugString() << " " |
| << node.cluster_id << " " << node.cluster_name; |
| } |
| msg = resource_mgr->modify_nodes(instance_id, to_add, to_del); |
| } |
| } |
| |
| void handle_notify_decommissioned(const std::string& instance_id, |
| const AlterClusterRequest* request, |
| std::shared_ptr<ResourceManager> resource_mgr, std::string& msg, |
| MetaServiceCode& code) { |
| resource_mgr->check_cluster_params_valid(request->cluster(), &msg, false, false); |
| if (msg != "") { |
| LOG(WARNING) << msg; |
| return; |
| } |
| |
| std::string be_unique_id = (request->cluster().nodes())[0].cloud_unique_id(); |
| std::vector<NodeInfo> nodes; |
| std::string err = resource_mgr->get_node(be_unique_id, &nodes); |
| if (!err.empty()) { |
| LOG(INFO) << "failed to check instance info, err=" << err; |
| msg = err; |
| return; |
| } |
| |
| std::vector<NodeInfo> decomission_nodes; |
| for (auto& node : nodes) { |
| for (auto req_node : request->cluster().nodes()) { |
| bool ip_processed = false; |
| if (node.node_info.has_ip() && req_node.has_ip()) { |
| std::string endpoint = |
| node.node_info.ip() + ":" + std::to_string(node.node_info.heartbeat_port()); |
| std::string req_endpoint = |
| req_node.ip() + ":" + std::to_string(req_node.heartbeat_port()); |
| if (endpoint == req_endpoint) { |
| decomission_nodes.push_back(node); |
| } |
| ip_processed = true; |
| } |
| |
| if (!ip_processed && node.node_info.has_host() && req_node.has_host()) { |
| std::string endpoint = node.node_info.host() + ":" + |
| std::to_string(node.node_info.heartbeat_port()); |
| std::string req_endpoint = |
| req_node.host() + ":" + std::to_string(req_node.heartbeat_port()); |
| if (endpoint == req_endpoint) { |
| decomission_nodes.push_back(node); |
| } |
| } |
| } |
| } |
| |
| { |
| std::vector<NodeInfo> to_add; |
| std::vector<NodeInfo>& to_del = decomission_nodes; |
| msg = resource_mgr->modify_nodes(instance_id, to_add, to_del); |
| } |
| { |
| std::vector<NodeInfo>& to_add = decomission_nodes; |
| std::vector<NodeInfo> to_del; |
| for (auto& node : to_add) { |
| node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONED); |
| LOG(INFO) << "notify node decomissioned, " |
| << " size: " << to_add.size() << " " << node.node_info.DebugString() << " " |
| << node.cluster_id << " " << node.cluster_name; |
| } |
| msg = resource_mgr->modify_nodes(instance_id, to_add, to_del); |
| } |
| } |
| |
| void handle_rename_cluster(const std::string& instance_id, const ClusterInfo& cluster, |
| std::shared_ptr<ResourceManager> resource_mgr, bool replace, |
| std::string& msg, MetaServiceCode& code) { |
| msg = resource_mgr->update_cluster( |
| instance_id, cluster, |
| [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); }, |
| [&](ClusterPB& c, std::vector<ClusterPB>& clusters_in_instance) { |
| std::string msg; |
| std::stringstream ss; |
| std::set<std::string> cluster_names; |
| for (auto cluster_in_instance : clusters_in_instance) { |
| cluster_names.emplace(cluster_in_instance.cluster_name()); |
| } |
| auto it = cluster_names.find(cluster.cluster.cluster_name()); |
| LOG(INFO) << "cluster.cluster.cluster_name(): " << cluster.cluster.cluster_name(); |
| for (auto itt : cluster_names) { |
| LOG(INFO) << "instance's cluster name : " << itt; |
| } |
| if (it != cluster_names.end()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "failed to rename cluster, a cluster with the same name already exists " |
| "in this instance " |
| << proto_to_json(c); |
| msg = ss.str(); |
| return msg; |
| } |
| if (c.cluster_name() == cluster.cluster.cluster_name()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "failed to rename cluster, name eq original name, original cluster is " |
| << proto_to_json(c); |
| msg = ss.str(); |
| return msg; |
| } |
| c.set_cluster_name(cluster.cluster.cluster_name()); |
| return msg; |
| }, |
| replace); |
| } |
| |
| void handle_update_cluster_endpoint(const std::string& instance_id, const ClusterInfo& cluster, |
| std::shared_ptr<ResourceManager> resource_mgr, std::string& msg, |
| MetaServiceCode& code) { |
| msg = resource_mgr->update_cluster( |
| instance_id, cluster, |
| [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); }, |
| [&](ClusterPB& c, std::vector<ClusterPB>&) { |
| std::string msg; |
| std::stringstream ss; |
| if (!cluster.cluster.has_private_endpoint() || |
| cluster.cluster.private_endpoint().empty()) { |
| code = MetaServiceCode::CLUSTER_ENDPOINT_MISSING; |
| ss << "missing private endpoint"; |
| msg = ss.str(); |
| return msg; |
| } |
| c.set_public_endpoint(cluster.cluster.public_endpoint()); |
| c.set_private_endpoint(cluster.cluster.private_endpoint()); |
| return msg; |
| }); |
| } |
| |
| void handle_set_cluster_status(const std::string& instance_id, const ClusterInfo& cluster, |
| std::shared_ptr<ResourceManager> resource_mgr, std::string& msg, |
| MetaServiceCode& code) { |
| msg = resource_mgr->update_cluster( |
| instance_id, cluster, |
| [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); }, |
| [&](ClusterPB& c, std::vector<ClusterPB>&) { |
| std::string msg; |
| std::stringstream ss; |
| if (ClusterPB::COMPUTE != c.type()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "just support set COMPUTE cluster status"; |
| msg = ss.str(); |
| return msg; |
| } |
| if (c.cluster_status() == cluster.cluster.cluster_status()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "failed to set cluster status, status eq original status, original " |
| "cluster is " |
| << print_cluster_status(c.cluster_status()); |
| msg = ss.str(); |
| return msg; |
| } |
| // status from -> to |
| std::set<std::pair<cloud::ClusterStatus, cloud::ClusterStatus>> |
| can_work_directed_edges { |
| {ClusterStatus::UNKNOWN, ClusterStatus::NORMAL}, |
| {ClusterStatus::NORMAL, ClusterStatus::SUSPENDED}, |
| {ClusterStatus::SUSPENDED, ClusterStatus::TO_RESUME}, |
| {ClusterStatus::TO_RESUME, ClusterStatus::NORMAL}, |
| {ClusterStatus::SUSPENDED, ClusterStatus::NORMAL}, |
| {ClusterStatus::NORMAL, ClusterStatus::MANUAL_SHUTDOWN}, |
| {ClusterStatus::MANUAL_SHUTDOWN, ClusterStatus::NORMAL}, |
| }; |
| auto from = c.cluster_status(); |
| auto to = cluster.cluster.cluster_status(); |
| if (can_work_directed_edges.count({from, to}) == 0) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "failed to set cluster status, original cluster is " |
| << print_cluster_status(from) << " and want set " |
| << print_cluster_status(to); |
| msg = ss.str(); |
| return msg; |
| } |
| c.set_cluster_status(cluster.cluster.cluster_status()); |
| return msg; |
| }); |
| } |
| |
| void handle_alter_vcluster_info(const std::string& instance_id, const ClusterInfo& cluster, |
| std::shared_ptr<ResourceManager> resource_mgr, std::string& msg, |
| MetaServiceCode& code) { |
| msg = resource_mgr->update_cluster( |
| instance_id, cluster, |
| [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); }, |
| [&](ClusterPB& c, std::vector<ClusterPB>& clusters_in_instance) { |
| std::string msg; |
| // Clear existing cluster names and set new ones if provided |
| for (auto it = clusters_in_instance.begin(); it != clusters_in_instance.end();) { |
| if (c.cluster_name() == it->cluster_name()) { |
| it = clusters_in_instance.erase(it); |
| } else { |
| ++it; |
| } |
| } |
| if (cluster.cluster.cluster_names_size() > 0) { |
| c.clear_cluster_names(); // Clear existing cluster names |
| for (const auto& name : cluster.cluster.cluster_names()) { |
| auto [ret_code, msg] = |
| resource_mgr->validate_sub_clusters({name}, clusters_in_instance); |
| if (ret_code != MetaServiceCode::OK) { |
| LOG(WARNING) << msg; |
| return msg; |
| } |
| c.add_cluster_names(name); // Add each new name |
| } |
| } |
| |
| // Check and set cluster policy if provided |
| if (cluster.cluster.has_cluster_policy()) { |
| const auto& policy = cluster.cluster.cluster_policy(); |
| if (policy.has_active_cluster_name()) { |
| auto [ret_code, msg] = resource_mgr->validate_sub_clusters( |
| {policy.active_cluster_name()}, clusters_in_instance); |
| if (ret_code != MetaServiceCode::OK) { |
| LOG(WARNING) << msg; |
| return msg; |
| } |
| c.mutable_cluster_policy()->set_active_cluster_name( |
| policy.active_cluster_name()); |
| } |
| |
| for (const auto& standby_name : policy.standby_cluster_names()) { |
| auto [ret_code, msg] = resource_mgr->validate_sub_clusters( |
| {standby_name}, clusters_in_instance); |
| if (ret_code != MetaServiceCode::OK) { |
| LOG(WARNING) << msg; |
| return msg; |
| } |
| c.mutable_cluster_policy()->clear_standby_cluster_names(); |
| c.mutable_cluster_policy()->add_standby_cluster_names(standby_name); |
| // current just support one stadby; |
| break; |
| } |
| |
| if (policy.has_type()) { |
| c.mutable_cluster_policy()->set_type(policy.type()); |
| } |
| |
| if (policy.has_failover_failure_threshold()) { |
| c.mutable_cluster_policy()->set_failover_failure_threshold( |
| policy.failover_failure_threshold()); |
| } |
| |
| if (policy.has_unhealthy_node_threshold_percent()) { |
| c.mutable_cluster_policy()->set_unhealthy_node_threshold_percent( |
| policy.unhealthy_node_threshold_percent()); |
| } |
| |
| if (!policy.cache_warmup_jobids().empty()) { |
| c.mutable_cluster_policy()->clear_cache_warmup_jobids(); |
| } |
| |
| for (const auto& warmup_jobid : policy.cache_warmup_jobids()) { |
| c.mutable_cluster_policy()->add_cache_warmup_jobids(warmup_jobid); |
| } |
| } |
| |
| // Validate the virtual cluster after alterations |
| if (!resource_mgr->validate_virtual_cluster(c, &msg)) { |
| return msg; // Return validation error |
| } |
| return msg; // Return success or empty message |
| }); |
| } |
| |
| void handle_alter_properties(const std::string& instance_id, const ClusterInfo& cluster, |
| std::shared_ptr<ResourceManager> resource_mgr, std::string& msg, |
| MetaServiceCode& code) { |
| msg = resource_mgr->update_cluster( |
| instance_id, cluster, |
| [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); }, |
| [&](ClusterPB& c, std::vector<ClusterPB>&) { |
| std::string msg; |
| std::stringstream ss; |
| if (ClusterPB::COMPUTE != c.type()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "just support set COMPUTE cluster status"; |
| msg = ss.str(); |
| return msg; |
| } |
| *c.mutable_properties() = cluster.cluster.properties(); |
| return msg; |
| }); |
| } |
| |
| void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, |
| const AlterClusterRequest* request, |
| AlterClusterResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(alter_cluster, get); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| instance_id = request->has_instance_id() ? request->instance_id() : ""; |
| |
| if (!cloud_unique_id.empty() && instance_id.empty()) { |
| auto [is_degraded_format, id] = |
| ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id); |
| if (config::enable_check_instance_id && is_degraded_format && |
| !resource_mgr_->is_instance_id_registered(id)) { |
| msg = "use degrade cloud_unique_id, but instance_id invalid, cloud_unique_id=" + |
| cloud_unique_id; |
| LOG(WARNING) << msg; |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| return; |
| } |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| } |
| |
| if (instance_id.empty() || !request->has_cluster()) { |
| msg = "invalid request instance_id or cluster not given"; |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| return; |
| } |
| |
| if (!request->has_op()) { |
| msg = "op not given"; |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| return; |
| } |
| |
| LOG(INFO) << "alter cluster instance_id=" << instance_id << " op=" << request->op(); |
| ClusterInfo cluster; |
| cluster.cluster.CopyFrom(request->cluster()); |
| |
| switch (request->op()) { |
| case AlterClusterRequest::ADD_CLUSTER: |
| handle_add_cluster(instance_id, cluster, resource_mgr(), msg, code); |
| break; |
| case AlterClusterRequest::DROP_CLUSTER: |
| handle_drop_cluster(instance_id, cluster, resource_mgr(), msg, code); |
| break; |
| case AlterClusterRequest::UPDATE_CLUSTER_MYSQL_USER_NAME: |
| handle_update_cluster_mySQL_username(instance_id, cluster, resource_mgr(), msg, code); |
| break; |
| case AlterClusterRequest::ADD_NODE: |
| handle_add_node(instance_id, request, resource_mgr(), msg, code); |
| break; |
| case AlterClusterRequest::DROP_NODE: |
| handle_drop_node(instance_id, request, resource_mgr(), msg, code); |
| break; |
| case AlterClusterRequest::DECOMMISSION_NODE: |
| handle_decommission_node(instance_id, request, resource_mgr(), msg, code); |
| break; |
| case AlterClusterRequest::NOTIFY_DECOMMISSIONED: |
| handle_notify_decommissioned(instance_id, request, resource_mgr(), msg, code); |
| break; |
| case AlterClusterRequest::RENAME_CLUSTER: { |
| // SQL mode, cluster cluster name eq empty cluster name, need drop empty cluster first. |
| // but in http api, cloud control will drop empty cluster |
| bool replace_if_existing_empty_target_cluster = |
| request->has_replace_if_existing_empty_target_cluster() |
| ? request->replace_if_existing_empty_target_cluster() |
| : false; |
| handle_rename_cluster(instance_id, cluster, resource_mgr(), |
| replace_if_existing_empty_target_cluster, msg, code); |
| break; |
| } |
| case AlterClusterRequest::UPDATE_CLUSTER_ENDPOINT: |
| handle_update_cluster_endpoint(instance_id, cluster, resource_mgr(), msg, code); |
| break; |
| case AlterClusterRequest::SET_CLUSTER_STATUS: |
| handle_set_cluster_status(instance_id, cluster, resource_mgr(), msg, code); |
| break; |
| case AlterClusterRequest::ALTER_VCLUSTER_INFO: |
| handle_alter_vcluster_info(instance_id, cluster, resource_mgr(), msg, code); |
| break; |
| case AlterClusterRequest::ALTER_PROPERTIES: |
| handle_alter_properties(instance_id, cluster, resource_mgr(), msg, code); |
| break; |
| default: |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "invalid request op, op=" << request->op(); |
| msg = ss.str(); |
| return; |
| } |
| |
| if (!msg.empty() && code == MetaServiceCode::OK) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| } |
| |
| // ugly but easy to repair |
| // not change cloud.proto add err_code |
| if (request->op() == AlterClusterRequest::DROP_NODE && |
| msg.find("not found") != std::string::npos) { |
| // see convert_ms_code_to_http_code, reuse CLUSTER_NOT_FOUND, return http status code 404 |
| code = MetaServiceCode::CLUSTER_NOT_FOUND; |
| } |
| |
| if (code != MetaServiceCode::OK) return; |
| |
| async_notify_refresh_instance( |
| txn_kv_, request->instance_id(), /*include_self=*/false, |
| [instance_id = request->instance_id()](const KVStats& stats) { |
| if (config::use_detailed_metrics && !instance_id.empty()) { |
| g_bvar_rpc_kv_alter_cluster_get_bytes.put({instance_id}, stats.get_bytes); |
| g_bvar_rpc_kv_alter_cluster_get_counter.put({instance_id}, stats.get_counter); |
| } |
| }); |
| } |
| |
| void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller, |
| const GetClusterRequest* request, GetClusterResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_cluster, get, put); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| std::string cluster_id = request->has_cluster_id() ? request->cluster_id() : ""; |
| std::string cluster_name = request->has_cluster_name() ? request->cluster_name() : ""; |
| std::string mysql_user_name = request->has_mysql_user_name() ? request->mysql_user_name() : ""; |
| |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud_unique_id must be given"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| if (request->has_instance_id()) { |
| instance_id = request->instance_id(); |
| // FIXME(gavin): this mechanism benifits debugging and |
| // administration, is it dangerous? |
| LOG(WARNING) << "failed to get instance_id with cloud_unique_id=" << cloud_unique_id |
| << " use the given instance_id=" << instance_id << " instead"; |
| } else { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| } |
| RPC_RATE_LIMIT(get_cluster) |
| // ATTN: if the case that multiple conditions are satisfied, just use by this order: |
| // cluster_id -> cluster_name -> mysql_user_name |
| if (!cluster_id.empty()) { |
| cluster_name = ""; |
| mysql_user_name = ""; |
| } else if (!cluster_name.empty()) { |
| mysql_user_name = ""; |
| } |
| |
| bool get_all_cluster_info = false; |
| // if cluster_id、cluster_name、mysql_user_name all empty, get this instance's all cluster info. |
| if (cluster_id.empty() && cluster_name.empty() && mysql_user_name.empty()) { |
| get_all_cluster_info = true; |
| } |
| |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "failed to get instance_id with cloud_unique_id=" + cloud_unique_id; |
| return; |
| } |
| |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| |
| response->set_enable_storage_vault(instance.enable_storage_vault()); |
| if (instance.enable_storage_vault() && |
| std::find_if(instance.storage_vault_names().begin(), instance.storage_vault_names().end(), |
| [](const std::string& name) { return name == BUILT_IN_STORAGE_VAULT_NAME; }) == |
| instance.storage_vault_names().end()) { |
| LOG_EVERY_N(INFO, 100) << "There is no builtin vault in instance " |
| << instance.instance_id(); |
| } |
| |
| auto get_cluster_mysql_user = [](const ClusterPB& c, std::set<std::string>* mysql_users) { |
| for (int i = 0; i < c.mysql_user_name_size(); i++) { |
| mysql_users->emplace(c.mysql_user_name(i)); |
| } |
| }; |
| |
| if (get_all_cluster_info) { |
| response->mutable_cluster()->CopyFrom(instance.clusters()); |
| LOG_EVERY_N(INFO, 100) << "get all cluster info, " << msg; |
| } else { |
| bool is_instance_changed = false; |
| for (int i = 0; i < instance.clusters_size(); ++i) { |
| auto& c = instance.clusters(i); |
| std::set<std::string> mysql_users; |
| get_cluster_mysql_user(c, &mysql_users); |
| // The last wins if add_cluster() does not ensure uniqueness of |
| // cluster_id and cluster_name respectively |
| if ((c.has_cluster_name() && c.cluster_name() == cluster_name) || |
| (c.has_cluster_id() && c.cluster_id() == cluster_id) || |
| mysql_users.count(mysql_user_name)) { |
| // just one cluster |
| response->add_cluster()->CopyFrom(c); |
| LOG_EVERY_N(INFO, 100) << "found a cluster, instance_id=" << instance.instance_id() |
| << " cluster=" << msg; |
| } |
| } |
| if (is_instance_changed) { |
| val = instance.SerializeAsString(); |
| if (val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return; |
| } |
| |
| txn->put(key, val); |
| LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key) |
| << " json=" << proto_to_json(instance); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit kv txn, err={}", err); |
| LOG(WARNING) << msg; |
| } |
| } |
| } |
| |
| if (response->cluster().empty()) { |
| ss << "fail to get cluster with " << request->ShortDebugString(); |
| msg = ss.str(); |
| std::replace(msg.begin(), msg.end(), '\n', ' '); |
| code = MetaServiceCode::CLUSTER_NOT_FOUND; |
| } |
| } // get_cluster |
| |
| void MetaServiceImpl::create_stage(::google::protobuf::RpcController* controller, |
| const CreateStageRequest* request, CreateStageResponse* response, |
| ::google::protobuf::Closure* done) { |
| TEST_SYNC_POINT_CALLBACK("create_stage_sk_request", const_cast<CreateStageRequest**>(&request)); |
| RPC_PREPROCESS(create_stage, get, put); |
| TEST_SYNC_POINT_RETURN_WITH_VOID("create_stage_sk_request_return"); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(create_stage) |
| |
| if (!request->has_stage()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "stage not set"; |
| return; |
| } |
| auto stage = request->stage(); |
| |
| if (!stage.has_type()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "stage type not set"; |
| return; |
| } |
| |
| if (stage.name().empty() && stage.type() == StagePB::EXTERNAL) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "stage name not set"; |
| return; |
| } |
| if (stage.stage_id().empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "stage id not set"; |
| return; |
| } |
| |
| if (stage.type() == StagePB::INTERNAL) { |
| if (stage.mysql_user_name().empty() || stage.mysql_user_id().empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "internal stage must have a mysql user name and id must be given, name size=" |
| << stage.mysql_user_name_size() << " id size=" << stage.mysql_user_id_size(); |
| msg = ss.str(); |
| LOG(WARNING) << msg; |
| return; |
| } |
| } |
| |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| |
| VLOG_DEBUG << "config stages num=" << config::max_num_stages; |
| if (instance.stages_size() >= config::max_num_stages) { |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = "this instance has greater than config num stages"; |
| LOG(WARNING) << "can't create more than config num stages, and instance has " |
| << std::to_string(instance.stages_size()); |
| return; |
| } |
| |
| // check if the stage exists |
| for (int i = 0; i < instance.stages_size(); ++i) { |
| auto& s = instance.stages(i); |
| if (stage.type() == StagePB::INTERNAL) { |
| // check all internal stage format is right |
| if (s.type() == StagePB::INTERNAL && s.mysql_user_id_size() == 0) { |
| LOG(WARNING) << "impossible, internal stage must have at least one id instance=" |
| << proto_to_json(instance); |
| } |
| |
| if (s.type() == StagePB::INTERNAL && |
| (s.mysql_user_id(0) == stage.mysql_user_id(0) || |
| s.mysql_user_name(0) == stage.mysql_user_name(0))) { |
| code = MetaServiceCode::ALREADY_EXISTED; |
| msg = "stage already exist"; |
| ss << "stage already exist, req user_name=" << stage.mysql_user_name(0) |
| << " existed user_name=" << s.mysql_user_name(0) |
| << "req user_id=" << stage.mysql_user_id(0) |
| << " existed user_id=" << s.mysql_user_id(0); |
| return; |
| } |
| } |
| |
| if (stage.type() == StagePB::EXTERNAL) { |
| if (s.name() == stage.name()) { |
| code = MetaServiceCode::ALREADY_EXISTED; |
| msg = "stage already exist"; |
| return; |
| } |
| } |
| |
| if (s.stage_id() == stage.stage_id()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "stage id is duplicated"; |
| return; |
| } |
| } |
| |
| if (stage.type() == StagePB::INTERNAL) { |
| if (instance.obj_info_size() == 0) { |
| LOG(WARNING) << "impossible, instance must have at least one obj_info."; |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = "impossible, instance must have at least one obj_info."; |
| return; |
| } |
| auto& lastest_obj = instance.obj_info()[instance.obj_info_size() - 1]; |
| // ${obj_prefix}/stage/{username}/{user_id} |
| std::string mysql_user_name = stage.mysql_user_name(0); |
| std::string prefix = fmt::format("{}/stage/{}/{}", lastest_obj.prefix(), mysql_user_name, |
| stage.mysql_user_id(0)); |
| auto as = instance.add_stages(); |
| as->mutable_obj_info()->set_prefix(prefix); |
| as->mutable_obj_info()->set_id(lastest_obj.id()); |
| as->add_mysql_user_name(mysql_user_name); |
| as->add_mysql_user_id(stage.mysql_user_id(0)); |
| as->set_stage_id(stage.stage_id()); |
| } else if (stage.type() == StagePB::EXTERNAL) { |
| if (!stage.has_obj_info()) { |
| instance.add_stages()->CopyFrom(stage); |
| } else { |
| StagePB tmp_stage; |
| tmp_stage.CopyFrom(stage); |
| auto obj_info = tmp_stage.mutable_obj_info(); |
| EncryptionInfoPB encryption_info; |
| AkSkPair cipher_ak_sk_pair; |
| if (encrypt_ak_sk_helper(obj_info->ak(), obj_info->sk(), &encryption_info, |
| &cipher_ak_sk_pair, code, msg) != 0) { |
| return; |
| } |
| obj_info->set_ak(std::move(cipher_ak_sk_pair.first)); |
| obj_info->set_sk(std::move(cipher_ak_sk_pair.second)); |
| obj_info->mutable_encryption_info()->CopyFrom(encryption_info); |
| instance.add_stages()->CopyFrom(tmp_stage); |
| } |
| } |
| val = instance.SerializeAsString(); |
| if (val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return; |
| } |
| |
| txn->atomic_add(system_meta_service_instance_update_key(), 1); |
| txn->put(key, val); |
| LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key) |
| << " json=" << proto_to_json(instance); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit kv txn, err={}", err); |
| LOG(WARNING) << msg; |
| } |
| } |
| |
| void MetaServiceImpl::get_stage(google::protobuf::RpcController* controller, |
| const GetStageRequest* request, GetStageResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_stage, get); |
| TEST_SYNC_POINT_CALLBACK("stage_sk_response", &response); |
| TEST_SYNC_POINT_RETURN_WITH_VOID("stage_sk_response_return"); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(get_stage) |
| if (!request->has_type()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "stage type not set"; |
| return; |
| } |
| auto type = request->type(); |
| |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| |
| if (type == StagePB::INTERNAL) { |
| auto mysql_user_name = request->has_mysql_user_name() ? request->mysql_user_name() : ""; |
| if (mysql_user_name.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "mysql user name not set"; |
| return; |
| } |
| auto mysql_user_id = request->has_mysql_user_id() ? request->mysql_user_id() : ""; |
| if (mysql_user_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "mysql user id not set"; |
| return; |
| } |
| |
| // check mysql user_name has been created internal stage |
| auto& stage = instance.stages(); |
| bool found = false; |
| if (instance.obj_info_size() == 0) { |
| LOG(WARNING) << "impossible, instance must have at least one obj_info."; |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = "impossible, instance must have at least one obj_info."; |
| return; |
| } |
| |
| for (auto s : stage) { |
| if (s.type() != StagePB::INTERNAL) { |
| continue; |
| } |
| if (s.mysql_user_name().size() == 0 || s.mysql_user_id().size() == 0) { |
| LOG(WARNING) << "impossible here, internal stage must have at least one user, " |
| "invalid stage=" |
| << proto_to_json(s); |
| continue; |
| } |
| if (s.mysql_user_name(0) == mysql_user_name) { |
| StagePB stage_pb; |
| // internal stage id is user_id, if user_id not eq internal stage's user_id, del it. |
| // let fe create a new internal stage |
| if (s.mysql_user_id(0) != mysql_user_id) { |
| LOG(INFO) << "ABA user=" << mysql_user_name |
| << " internal stage original user_id=" << s.mysql_user_id()[0] |
| << " rpc user_id=" << mysql_user_id |
| << " stage info=" << proto_to_json(s); |
| code = MetaServiceCode::STATE_ALREADY_EXISTED_FOR_USER; |
| msg = "aba user, drop stage and create a new one"; |
| // response return to be dropped stage id. |
| stage_pb.CopyFrom(s); |
| response->add_stage()->CopyFrom(stage_pb); |
| return; |
| } |
| // find, use it stage prefix and id |
| found = true; |
| // get from internal stage |
| int idx = stoi(s.obj_info().id()); |
| if (idx > instance.obj_info().size() || idx < 1) { |
| LOG(WARNING) << "invalid idx: " << idx; |
| code = MetaServiceCode::UNDEFINED_ERR; |
| msg = "impossible, id invalid"; |
| return; |
| } |
| auto& old_obj = instance.obj_info()[idx - 1]; |
| |
| stage_pb.mutable_obj_info()->set_ak(old_obj.ak()); |
| stage_pb.mutable_obj_info()->set_sk(old_obj.sk()); |
| if (old_obj.has_encryption_info()) { |
| AkSkPair plain_ak_sk_pair; |
| int ret = decrypt_ak_sk_helper(old_obj.ak(), old_obj.sk(), |
| old_obj.encryption_info(), &plain_ak_sk_pair, |
| code, msg); |
| if (ret != 0) return; |
| stage_pb.mutable_obj_info()->set_ak(std::move(plain_ak_sk_pair.first)); |
| stage_pb.mutable_obj_info()->set_sk(std::move(plain_ak_sk_pair.second)); |
| } |
| stage_pb.mutable_obj_info()->set_bucket(old_obj.bucket()); |
| stage_pb.mutable_obj_info()->set_endpoint(old_obj.endpoint()); |
| stage_pb.mutable_obj_info()->set_external_endpoint(old_obj.external_endpoint()); |
| stage_pb.mutable_obj_info()->set_region(old_obj.region()); |
| stage_pb.mutable_obj_info()->set_provider(old_obj.provider()); |
| stage_pb.mutable_obj_info()->set_prefix(s.obj_info().prefix()); |
| stage_pb.set_stage_id(s.stage_id()); |
| stage_pb.set_type(s.type()); |
| response->add_stage()->CopyFrom(stage_pb); |
| return; |
| } |
| } |
| if (!found) { |
| LOG(INFO) << "user=" << mysql_user_name |
| << " not have a valid stage, rpc user_id=" << mysql_user_id; |
| code = MetaServiceCode::STAGE_NOT_FOUND; |
| msg = "stage not found, create a new one"; |
| return; |
| } |
| } |
| |
| // get all external stages for display, but don't show ak/sk, so there is no need to decrypt ak/sk. |
| if (type == StagePB::EXTERNAL && !request->has_stage_name()) { |
| for (int i = 0; i < instance.stages_size(); ++i) { |
| auto& s = instance.stages(i); |
| if (s.type() != StagePB::EXTERNAL) { |
| continue; |
| } |
| response->add_stage()->CopyFrom(s); |
| } |
| return; |
| } |
| |
| // get external stage with the specified stage name |
| for (int i = 0; i < instance.stages_size(); ++i) { |
| auto& s = instance.stages(i); |
| if (s.type() == type && s.name() == request->stage_name()) { |
| StagePB stage; |
| stage.CopyFrom(s); |
| if (!stage.has_access_type() || stage.access_type() == StagePB::AKSK) { |
| stage.set_access_type(StagePB::AKSK); |
| auto obj_info = stage.mutable_obj_info(); |
| if (obj_info->has_encryption_info()) { |
| AkSkPair plain_ak_sk_pair; |
| int ret = decrypt_ak_sk_helper(obj_info->ak(), obj_info->sk(), |
| obj_info->encryption_info(), &plain_ak_sk_pair, |
| code, msg); |
| if (ret != 0) return; |
| obj_info->set_ak(std::move(plain_ak_sk_pair.first)); |
| obj_info->set_sk(std::move(plain_ak_sk_pair.second)); |
| } |
| } else if (stage.access_type() == StagePB::BUCKET_ACL) { |
| if (!instance.has_ram_user()) { |
| ss << "instance does not have ram user"; |
| msg = ss.str(); |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| return; |
| } |
| if (instance.ram_user().has_encryption_info()) { |
| AkSkPair plain_ak_sk_pair; |
| int ret = decrypt_ak_sk_helper( |
| instance.ram_user().ak(), instance.ram_user().sk(), |
| instance.ram_user().encryption_info(), &plain_ak_sk_pair, code, msg); |
| if (ret != 0) return; |
| stage.mutable_obj_info()->set_ak(std::move(plain_ak_sk_pair.first)); |
| stage.mutable_obj_info()->set_sk(std::move(plain_ak_sk_pair.second)); |
| } else { |
| stage.mutable_obj_info()->set_ak(instance.ram_user().ak()); |
| stage.mutable_obj_info()->set_sk(instance.ram_user().sk()); |
| } |
| } else if (stage.access_type() == StagePB::IAM) { |
| std::string val; |
| TxnErrorCode err = txn->get(system_meta_service_arn_info_key(), &val); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| // For compatibility, use arn_info of config |
| stage.mutable_obj_info()->set_ak(config::arn_ak); |
| stage.mutable_obj_info()->set_sk(config::arn_sk); |
| stage.set_external_id(instance_id); |
| } else if (err == TxnErrorCode::TXN_OK) { |
| RamUserPB iam_user; |
| if (!iam_user.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse RamUserPB"; |
| return; |
| } |
| AkSkPair plain_ak_sk_pair; |
| int ret = decrypt_ak_sk_helper(iam_user.ak(), iam_user.sk(), |
| iam_user.encryption_info(), &plain_ak_sk_pair, |
| code, msg); |
| if (ret != 0) return; |
| stage.mutable_obj_info()->set_ak(std::move(plain_ak_sk_pair.first)); |
| stage.mutable_obj_info()->set_sk(std::move(plain_ak_sk_pair.second)); |
| stage.set_external_id(instance_id); |
| } else { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get arn_info_key, err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| } |
| response->add_stage()->CopyFrom(stage); |
| return; |
| } |
| } |
| |
| ss << "stage not found with " << proto_to_json(*request); |
| msg = ss.str(); |
| code = MetaServiceCode::STAGE_NOT_FOUND; |
| } |
| |
| void MetaServiceImpl::drop_stage(google::protobuf::RpcController* controller, |
| const DropStageRequest* request, DropStageResponse* response, |
| ::google::protobuf::Closure* done) { |
| StopWatch sw; |
| auto ctrl = static_cast<brpc::Controller*>(controller); |
| LOG(INFO) << "rpc from " << ctrl->remote_side() << " request=" << request->DebugString(); |
| brpc::ClosureGuard closure_guard(done); |
| int ret = 0; |
| MetaServiceCode code = MetaServiceCode::OK; |
| std::string msg = "OK"; |
| std::string instance_id; |
| bool drop_request = false; |
| DORIS_CLOUD_DEFER { |
| response->mutable_status()->set_code(code); |
| response->mutable_status()->set_msg(msg); |
| LOG(INFO) << (ret == 0 ? "succ to " : "failed to ") << __PRETTY_FUNCTION__ << " " |
| << ctrl->remote_side() << " " << msg; |
| closure_guard.reset(nullptr); |
| if (config::use_detailed_metrics && !instance_id.empty() && !drop_request) { |
| g_bvar_ms_drop_stage.put(instance_id, sw.elapsed_us()); |
| } |
| }; |
| |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(drop_stage) |
| |
| if (!request->has_type()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "stage type not set"; |
| return; |
| } |
| auto type = request->type(); |
| |
| if (type == StagePB::EXTERNAL && request->stage_name().empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "external stage but not set stage name"; |
| return; |
| } |
| |
| if (type == StagePB::INTERNAL && request->mysql_user_id().empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "internal stage but not set user id"; |
| return; |
| } |
| |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| std::stringstream ss; |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| |
| StagePB stage; |
| int idx = -1; |
| for (int i = 0; i < instance.stages_size(); ++i) { |
| auto& s = instance.stages(i); |
| if ((type == StagePB::INTERNAL && s.type() == StagePB::INTERNAL && |
| s.mysql_user_id(0) == request->mysql_user_id()) || |
| (type == StagePB::EXTERNAL && s.type() == StagePB::EXTERNAL && |
| s.name() == request->stage_name())) { |
| idx = i; |
| stage = s; |
| break; |
| } |
| } |
| if (idx == -1) { |
| ss << "stage not found with " << proto_to_json(*request); |
| msg = ss.str(); |
| code = MetaServiceCode::STAGE_NOT_FOUND; |
| return; |
| } |
| |
| auto& stages = const_cast<std::decay_t<decltype(instance.stages())>&>(instance.stages()); |
| stages.DeleteSubrange(idx, 1); // Remove it |
| val = instance.SerializeAsString(); |
| if (val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return; |
| } |
| txn->put(key, val); |
| LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key) |
| << " json=" << proto_to_json(instance); |
| |
| std::string key1; |
| std::string val1; |
| if (type == StagePB::INTERNAL) { |
| RecycleStageKeyInfo recycle_stage_key_info {instance_id, stage.stage_id()}; |
| recycle_stage_key(recycle_stage_key_info, &key1); |
| RecycleStagePB recycle_stage; |
| recycle_stage.set_instance_id(instance_id); |
| recycle_stage.set_reason(request->reason()); |
| recycle_stage.mutable_stage()->CopyFrom(stage); |
| val1 = recycle_stage.SerializeAsString(); |
| if (val1.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return; |
| } |
| txn->put(key1, val1); |
| } |
| |
| txn->atomic_add(system_meta_service_instance_update_key(), 1); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit kv txn, err={}", err); |
| LOG(WARNING) << msg; |
| } |
| } |
| |
| void MetaServiceImpl::get_iam(google::protobuf::RpcController* controller, |
| const GetIamRequest* request, GetIamResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_iam, get); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(get_iam) |
| |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| |
| val.clear(); |
| err = txn->get(system_meta_service_arn_info_key(), &val); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| // For compatibility, use arn_info of config |
| RamUserPB iam_user; |
| iam_user.set_user_id(config::arn_id); |
| iam_user.set_external_id(instance_id); |
| iam_user.set_ak(config::arn_ak); |
| iam_user.set_sk(config::arn_sk); |
| response->mutable_iam_user()->CopyFrom(iam_user); |
| } else if (err == TxnErrorCode::TXN_OK) { |
| RamUserPB iam_user; |
| if (!iam_user.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse RamUserPB"; |
| return; |
| } |
| AkSkPair plain_ak_sk_pair; |
| int ret = decrypt_ak_sk_helper(iam_user.ak(), iam_user.sk(), iam_user.encryption_info(), |
| &plain_ak_sk_pair, code, msg); |
| if (ret != 0) return; |
| iam_user.set_external_id(instance_id); |
| iam_user.set_ak(std::move(plain_ak_sk_pair.first)); |
| iam_user.set_sk(std::move(plain_ak_sk_pair.second)); |
| response->mutable_iam_user()->CopyFrom(iam_user); |
| } else { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get arn_info_key, err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| if (instance.has_ram_user()) { |
| RamUserPB ram_user; |
| ram_user.CopyFrom(instance.ram_user()); |
| if (ram_user.has_encryption_info()) { |
| AkSkPair plain_ak_sk_pair; |
| int ret = decrypt_ak_sk_helper(ram_user.ak(), ram_user.sk(), ram_user.encryption_info(), |
| &plain_ak_sk_pair, code, msg); |
| if (ret != 0) return; |
| ram_user.set_ak(std::move(plain_ak_sk_pair.first)); |
| ram_user.set_sk(std::move(plain_ak_sk_pair.second)); |
| } |
| response->mutable_ram_user()->CopyFrom(ram_user); |
| } |
| } |
| |
| void MetaServiceImpl::alter_iam(google::protobuf::RpcController* controller, |
| const AlterIamRequest* request, AlterIamResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(alter_iam, get, put); |
| std::string arn_id = request->has_account_id() ? request->account_id() : ""; |
| std::string arn_ak = request->has_ak() ? request->ak() : ""; |
| std::string arn_sk = request->has_sk() ? request->sk() : ""; |
| if (arn_id.empty() || arn_ak.empty() || arn_sk.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "invalid argument"; |
| return; |
| } |
| RPC_RATE_LIMIT(alter_iam) |
| |
| // for metric, give it a common instance id |
| instance_id = "alter_iam_instance"; |
| |
| std::string key = system_meta_service_arn_info_key(); |
| std::string val; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| err = txn->get(key, &val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "fail to arn_info_key, err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| bool is_add_req = err == TxnErrorCode::TXN_KEY_NOT_FOUND; |
| EncryptionInfoPB encryption_info; |
| AkSkPair cipher_ak_sk_pair; |
| if (encrypt_ak_sk_helper(arn_ak, arn_sk, &encryption_info, &cipher_ak_sk_pair, code, msg) != |
| 0) { |
| return; |
| } |
| const auto& [ak, sk] = cipher_ak_sk_pair; |
| RamUserPB iam_user; |
| std::string old_ak; |
| std::string old_sk; |
| if (!is_add_req) { |
| if (!iam_user.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| ss << "failed to parse RamUserPB"; |
| msg = ss.str(); |
| return; |
| } |
| |
| if (arn_id == iam_user.user_id() && ak == iam_user.ak() && sk == iam_user.sk()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| ss << "already has the same arn info"; |
| msg = ss.str(); |
| return; |
| } |
| old_ak = iam_user.ak(); |
| old_sk = iam_user.sk(); |
| } |
| iam_user.set_user_id(arn_id); |
| iam_user.set_ak(std::move(cipher_ak_sk_pair.first)); |
| iam_user.set_sk(std::move(cipher_ak_sk_pair.second)); |
| iam_user.mutable_encryption_info()->CopyFrom(encryption_info); |
| val = iam_user.SerializeAsString(); |
| if (val.empty()) { |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| ss << "failed to serialize"; |
| msg = ss.str(); |
| return; |
| } |
| txn->put(key, val); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| ss << "txn->commit failed() err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| if (is_add_req) { |
| LOG(INFO) << "add new iam info, cipher ak: " << iam_user.ak() |
| << " cipher sk: " << iam_user.sk(); |
| } else { |
| LOG(INFO) << "alter iam info, old: cipher ak: " << old_ak << " cipher sk" << old_sk |
| << " new: cipher ak: " << iam_user.ak() << " cipher sk:" << iam_user.sk(); |
| } |
| } |
| |
| void MetaServiceImpl::alter_ram_user(google::protobuf::RpcController* controller, |
| const AlterRamUserRequest* request, |
| AlterRamUserResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(alter_ram_user, get, put); |
| instance_id = request->has_instance_id() ? request->instance_id() : ""; |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| return; |
| } |
| if (!request->has_ram_user() || request->ram_user().user_id().empty() || |
| request->ram_user().ak().empty() || request->ram_user().sk().empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "ram user info err " + proto_to_json(*request); |
| return; |
| } |
| auto& ram_user = request->ram_user(); |
| RPC_RATE_LIMIT(alter_ram_user) |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse InstanceInfoPB"; |
| return; |
| } |
| if (instance.status() == InstanceInfoPB::DELETED) { |
| code = MetaServiceCode::CLUSTER_NOT_FOUND; |
| msg = "instance status has been set delete, plz check it"; |
| return; |
| } |
| if (instance.has_ram_user()) { |
| LOG(WARNING) << "instance has ram user. instance_id=" << instance_id |
| << ", ram_user_id=" << ram_user.user_id(); |
| } |
| EncryptionInfoPB encryption_info; |
| AkSkPair cipher_ak_sk_pair; |
| if (encrypt_ak_sk_helper(ram_user.ak(), ram_user.sk(), &encryption_info, &cipher_ak_sk_pair, |
| code, msg) != 0) { |
| return; |
| } |
| RamUserPB new_ram_user; |
| new_ram_user.CopyFrom(ram_user); |
| new_ram_user.set_user_id(ram_user.user_id()); |
| new_ram_user.set_ak(std::move(cipher_ak_sk_pair.first)); |
| new_ram_user.set_sk(std::move(cipher_ak_sk_pair.second)); |
| new_ram_user.mutable_encryption_info()->CopyFrom(encryption_info); |
| instance.mutable_ram_user()->CopyFrom(new_ram_user); |
| val = instance.SerializeAsString(); |
| if (val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return; |
| } |
| txn->atomic_add(system_meta_service_instance_update_key(), 1); |
| txn->put(key, val); |
| LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit kv txn, err={}", err); |
| LOG(WARNING) << msg; |
| } |
| } |
| |
| void MetaServiceImpl::begin_copy(google::protobuf::RpcController* controller, |
| const BeginCopyRequest* request, BeginCopyResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(begin_copy, get, put); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(begin_copy) |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| |
| // copy job key |
| CopyJobKeyInfo key_info {instance_id, request->stage_id(), request->table_id(), |
| request->copy_id(), request->group_id()}; |
| std::string key; |
| std::string val; |
| copy_job_key(key_info, &key); |
| // copy job value |
| CopyJobPB copy_job; |
| copy_job.set_stage_type(request->stage_type()); |
| copy_job.set_job_status(CopyJobPB::LOADING); |
| copy_job.set_start_time_ms(request->start_time_ms()); |
| copy_job.set_timeout_time_ms(request->timeout_time_ms()); |
| |
| std::vector<std::pair<std::string, std::string>> copy_files; |
| auto& object_files = request->object_files(); |
| int file_num = 0; |
| size_t file_size = 0; |
| size_t file_meta_size = 0; |
| for (auto i = 0; i < object_files.size(); ++i) { |
| auto& file = object_files.at(i); |
| // 1. get copy file kv to check if file is loading or loaded |
| CopyFileKeyInfo file_key_info {instance_id, request->stage_id(), request->table_id(), |
| file.relative_path(), file.etag()}; |
| std::string file_key; |
| copy_file_key(file_key_info, &file_key); |
| std::string file_val; |
| TxnErrorCode err = txn->get(file_key, &file_val); |
| if (err == TxnErrorCode::TXN_OK) { // found key |
| continue; |
| } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { // error |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get copy file, err={}", err); |
| LOG(WARNING) << msg; |
| return; |
| } |
| // 2. check if reach any limit |
| ++file_num; |
| file_size += file.size(); |
| file_meta_size += file.ByteSizeLong(); |
| if (file_num > 1 && |
| ((request->file_num_limit() > 0 && file_num > request->file_num_limit()) || |
| (request->file_size_limit() > 0 && file_size > request->file_size_limit()) || |
| (request->file_meta_size_limit() > 0 && |
| file_meta_size > request->file_meta_size_limit()))) { |
| break; |
| } |
| // 3. put copy file kv |
| CopyFilePB copy_file; |
| copy_file.set_copy_id(request->copy_id()); |
| copy_file.set_group_id(request->group_id()); |
| std::string copy_file_val = copy_file.SerializeAsString(); |
| if (copy_file_val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return; |
| } |
| copy_files.emplace_back(std::move(file_key), std::move(copy_file_val)); |
| // 3. add file to copy job value |
| copy_job.add_object_files()->CopyFrom(file); |
| response->add_filtered_object_files()->CopyFrom(file); |
| } |
| |
| if (file_num == 0) { |
| return; |
| } |
| |
| val = copy_job.SerializeAsString(); |
| if (val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return; |
| } |
| // put copy job |
| txn->put(key, val); |
| LOG(INFO) << "put copy_job_key=" << hex(key); |
| // put copy file |
| for (const auto& [k, v] : copy_files) { |
| txn->put(k, v); |
| LOG(INFO) << "put copy_file_key=" << hex(k); |
| } |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit kv txn, err={}", err); |
| LOG(WARNING) << msg; |
| } |
| } |
| |
| void MetaServiceImpl::finish_copy(google::protobuf::RpcController* controller, |
| const FinishCopyRequest* request, FinishCopyResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(finish_copy, get, put, del); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(finish_copy) |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| |
| // copy job key |
| CopyJobKeyInfo key_info {instance_id, request->stage_id(), request->table_id(), |
| request->copy_id(), request->group_id()}; |
| std::string key; |
| std::string val; |
| copy_job_key(key_info, &key); |
| err = txn->get(key, &val); |
| LOG(INFO) << "get copy_job_key=" << hex(key); |
| |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // not found |
| code = MetaServiceCode::COPY_JOB_NOT_FOUND; |
| ss << "copy job does not found"; |
| msg = ss.str(); |
| return; |
| } else if (err != TxnErrorCode::TXN_OK) { // error |
| code = cast_as<ErrCategory::READ>(err); |
| ss << "failed to get copy_job, instance_id=" << instance_id << " err=" << err; |
| msg = ss.str(); |
| return; |
| } |
| |
| CopyJobPB copy_job; |
| if (!copy_job.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse CopyJobPB"; |
| return; |
| } |
| |
| std::vector<std::string> copy_files; |
| if (request->action() == FinishCopyRequest::COMMIT) { |
| // 1. update copy job status from Loading to Finish |
| copy_job.set_job_status(CopyJobPB::FINISH); |
| if (request->has_finish_time_ms()) { |
| copy_job.set_finish_time_ms(request->finish_time_ms()); |
| } |
| val = copy_job.SerializeAsString(); |
| if (val.empty()) { |
| msg = "failed to serialize"; |
| code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; |
| return; |
| } |
| txn->put(key, val); |
| LOG(INFO) << "put copy_job_key=" << hex(key); |
| } else if (request->action() == FinishCopyRequest::ABORT || |
| request->action() == FinishCopyRequest::REMOVE) { |
| // 1. remove copy job kv |
| // 2. remove copy file kvs |
| txn->remove(key); |
| LOG(INFO) << (request->action() == FinishCopyRequest::ABORT ? "abort" : "remove") |
| << " copy_job_key=" << hex(key); |
| for (const auto& file : copy_job.object_files()) { |
| // copy file key |
| CopyFileKeyInfo file_key_info {instance_id, request->stage_id(), request->table_id(), |
| file.relative_path(), file.etag()}; |
| std::string file_key; |
| copy_file_key(file_key_info, &file_key); |
| copy_files.emplace_back(std::move(file_key)); |
| } |
| for (const auto& k : copy_files) { |
| txn->remove(k); |
| LOG(INFO) << "remove copy_file_key=" << hex(k); |
| } |
| } else { |
| msg = "Unhandled action"; |
| code = MetaServiceCode::UNDEFINED_ERR; |
| return; |
| } |
| |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::COMMIT>(err); |
| msg = fmt::format("failed to commit kv txn, err={}", err); |
| LOG(WARNING) << msg; |
| } |
| } |
| |
| void MetaServiceImpl::get_copy_job(google::protobuf::RpcController* controller, |
| const GetCopyJobRequest* request, GetCopyJobResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_copy_job, get); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| |
| CopyJobKeyInfo key_info {instance_id, request->stage_id(), request->table_id(), |
| request->copy_id(), request->group_id()}; |
| std::string key; |
| copy_job_key(key_info, &key); |
| std::string val; |
| err = txn->get(key, &val); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // not found key |
| return; |
| } else if (err != TxnErrorCode::TXN_OK) { // error |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get copy job, err={}", err); |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| CopyJobPB copy_job; |
| if (!copy_job.ParseFromString(val)) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse CopyJobPB"; |
| return; |
| } |
| response->mutable_copy_job()->CopyFrom(copy_job); |
| } |
| |
| void MetaServiceImpl::get_copy_files(google::protobuf::RpcController* controller, |
| const GetCopyFilesRequest* request, |
| GetCopyFilesResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_copy_files, get); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(get_copy_files) |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| |
| CopyJobKeyInfo key_info0 {instance_id, request->stage_id(), request->table_id(), "", 0}; |
| CopyJobKeyInfo key_info1 {instance_id, request->stage_id(), request->table_id() + 1, "", 0}; |
| std::string key0; |
| std::string key1; |
| copy_job_key(key_info0, &key0); |
| copy_job_key(key_info1, &key1); |
| std::unique_ptr<RangeGetIterator> it; |
| do { |
| TxnErrorCode err = txn->get(key0, key1, &it); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::READ>(err); |
| msg = fmt::format("failed to get copy jobs, err={}", err); |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| |
| while (it->has_next()) { |
| auto [k, v] = it->next(); |
| if (!it->has_next()) key0 = k; |
| CopyJobPB copy_job; |
| if (!copy_job.ParseFromArray(v.data(), v.size())) { |
| code = MetaServiceCode::PROTOBUF_PARSE_ERR; |
| msg = "failed to parse CopyJobPB"; |
| return; |
| } |
| // TODO check if job is timeout |
| for (const auto& file : copy_job.object_files()) { |
| response->add_object_files()->CopyFrom(file); |
| } |
| } |
| key0.push_back('\x00'); |
| } while (it->more()); |
| } |
| |
| void MetaServiceImpl::filter_copy_files(google::protobuf::RpcController* controller, |
| const FilterCopyFilesRequest* request, |
| FilterCopyFilesResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(filter_copy_files, get); |
| std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; |
| if (cloud_unique_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud unique id not set"; |
| return; |
| } |
| |
| instance_id = get_instance_id(resource_mgr_, cloud_unique_id); |
| if (instance_id.empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "empty instance_id"; |
| LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; |
| return; |
| } |
| RPC_RATE_LIMIT(filter_copy_files) |
| |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| code = cast_as<ErrCategory::CREATE>(err); |
| msg = "failed to create txn"; |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } |
| |
| std::vector<ObjectFilePB> filter_files; |
| for (auto i = 0; i < request->object_files().size(); ++i) { |
| auto& file = request->object_files().at(i); |
| // 1. get copy file kv to check if file is loading or loaded |
| CopyFileKeyInfo file_key_info {instance_id, request->stage_id(), request->table_id(), |
| file.relative_path(), file.etag()}; |
| std::string file_key; |
| copy_file_key(file_key_info, &file_key); |
| std::string file_val; |
| TxnErrorCode err = txn->get(file_key, &file_val); |
| if (err == TxnErrorCode::TXN_OK) { // found key |
| continue; |
| } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { // error |
| msg = fmt::format("failed to get copy file, err={}", err); |
| LOG(WARNING) << msg << " err=" << err; |
| return; |
| } else { |
| response->add_object_files()->CopyFrom(file); |
| } |
| } |
| } |
| |
| void MetaServiceImpl::get_cluster_status(google::protobuf::RpcController* controller, |
| const GetClusterStatusRequest* request, |
| GetClusterStatusResponse* response, |
| ::google::protobuf::Closure* done) { |
| RPC_PREPROCESS(get_cluster_status, get); |
| if (request->instance_ids().empty() && request->cloud_unique_ids().empty()) { |
| code = MetaServiceCode::INVALID_ARGUMENT; |
| msg = "cloud_unique_ids or instance_ids must be given, instance_ids.size: " + |
| std::to_string(request->instance_ids().size()) + |
| " cloud_unique_ids.size: " + std::to_string(request->cloud_unique_ids().size()); |
| return; |
| } |
| |
| std::vector<std::string> instance_ids; |
| instance_ids.reserve( |
| std::max(request->instance_ids().size(), request->cloud_unique_ids().size())); |
| |
| // priority use instance_ids |
| if (!request->instance_ids().empty()) { |
| std::for_each(request->instance_ids().begin(), request->instance_ids().end(), |
| [&](const auto& it) { instance_ids.emplace_back(it); }); |
| } else if (!request->cloud_unique_ids().empty()) { |
| std::for_each(request->cloud_unique_ids().begin(), request->cloud_unique_ids().end(), |
| [&](const auto& it) { |
| std::string instance_id = get_instance_id(resource_mgr_, it); |
| if (instance_id.empty()) { |
| LOG(INFO) << "cant get instance_id from cloud_unique_id : " << it; |
| return; |
| } |
| instance_ids.emplace_back(instance_id); |
| }); |
| } |
| |
| if (instance_ids.empty()) { |
| LOG(INFO) << "can't get valid instanceids"; |
| return; |
| } |
| bool has_filter = request->has_status(); |
| |
| RPC_RATE_LIMIT(get_cluster_status) |
| |
| auto get_clusters_info = [this, &request, &response, |
| &has_filter](const std::string& instance_id) { |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv_->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to create txn err=" << err; |
| return; |
| } |
| DORIS_CLOUD_DEFER { |
| if (config::use_detailed_metrics && txn != nullptr) { |
| g_bvar_rpc_kv_get_cluster_status_get_bytes.put({instance_id}, txn->get_bytes()); |
| g_bvar_rpc_kv_get_cluster_status_get_counter.put({instance_id}, |
| txn->num_get_keys()); |
| } |
| }; |
| err = txn->get(key, &val); |
| LOG(INFO) << "get instance_key=" << hex(key); |
| |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to get instance, instance_id=" << instance_id << " err=" << err; |
| return; |
| } |
| InstanceInfoPB instance; |
| if (!instance.ParseFromString(val)) { |
| LOG(WARNING) << "failed to parse InstanceInfoPB"; |
| return; |
| } |
| GetClusterStatusResponse::GetClusterStatusResponseDetail detail; |
| detail.set_instance_id(instance_id); |
| for (auto& cluster : instance.clusters()) { |
| if (cluster.type() != ClusterPB::COMPUTE) { |
| continue; |
| } |
| ClusterPB pb; |
| pb.set_cluster_name(cluster.cluster_name()); |
| pb.set_cluster_id(cluster.cluster_id()); |
| if (has_filter && request->status() != cluster.cluster_status()) { |
| continue; |
| } |
| // for compatible |
| if (cluster.has_cluster_status()) { |
| pb.set_cluster_status(cluster.cluster_status()); |
| } else { |
| pb.set_cluster_status(ClusterStatus::NORMAL); |
| } |
| detail.add_clusters()->CopyFrom(pb); |
| } |
| if (detail.clusters().size() == 0) { |
| return; |
| } |
| response->add_details()->CopyFrom(detail); |
| }; |
| |
| std::for_each(instance_ids.begin(), instance_ids.end(), get_clusters_info); |
| |
| msg = proto_to_json(*response); |
| } |
| |
| void notify_refresh_instance(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, |
| KVStats* stats, bool include_self) { |
| if (!config::enable_notify_instance_update) { |
| LOG(INFO) << "notify_refresh_instance is disabled"; |
| return; |
| } |
| |
| LOG(INFO) << "begin notify_refresh_instance, include_self=" << include_self; |
| TEST_SYNC_POINT_RETURN_WITH_VOID("notify_refresh_instance_return"); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to create txn err=" << err; |
| return; |
| } |
| std::string key = system_meta_service_registry_key(); |
| std::string val; |
| err = txn->get(key, &val); |
| if (stats) { |
| stats->get_counter++; |
| } |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "failed to get server registry" |
| << " err=" << err; |
| return; |
| } |
| if (stats) { |
| stats->get_bytes += val.size() + key.size(); |
| } |
| std::string self_endpoint = |
| config::hostname.empty() ? get_local_ip(config::priority_networks) : config::hostname; |
| self_endpoint = fmt::format("{}:{}", self_endpoint, config::brpc_listen_port); |
| ServiceRegistryPB reg; |
| reg.ParseFromString(val); |
| |
| brpc::ChannelOptions options; |
| options.connection_type = brpc::ConnectionType::CONNECTION_TYPE_SHORT; |
| |
| static std::unordered_map<std::string, std::shared_ptr<MetaService_Stub>> stubs; |
| static std::mutex mtx; |
| |
| std::vector<bthread_t> btids; |
| btids.reserve(reg.items_size()); |
| for (int i = 0; i < reg.items_size(); ++i) { |
| int ret = 0; |
| auto& e = reg.items(i); |
| std::string endpoint; |
| if (e.has_host()) { |
| endpoint = fmt::format("{}:{}", e.host(), e.port()); |
| } else { |
| endpoint = fmt::format("{}:{}", e.ip(), e.port()); |
| } |
| if (endpoint == self_endpoint && !include_self) continue; |
| |
| // Prepare stub |
| std::shared_ptr<MetaService_Stub> stub; |
| do { |
| std::lock_guard l(mtx); |
| if (auto it = stubs.find(endpoint); it != stubs.end()) { |
| stub = it->second; |
| break; |
| } |
| auto channel = std::make_unique<brpc::Channel>(); |
| ret = channel->Init(endpoint.c_str(), &options); |
| if (ret != 0) { |
| LOG(WARNING) << "fail to init brpc channel, endpoint=" << endpoint; |
| break; |
| } |
| stub = std::make_shared<MetaService_Stub>(channel.release(), |
| google::protobuf::Service::STUB_OWNS_CHANNEL); |
| } while (false); |
| if (ret != 0) continue; |
| |
| // Issue RPC |
| auto f = new std::function<void()>([instance_id, stub, endpoint] { |
| int num_try = 0; |
| bool succ = false; |
| while (num_try++ < 3) { |
| brpc::Controller cntl; |
| cntl.set_timeout_ms(3000); |
| AlterInstanceRequest req; |
| AlterInstanceResponse res; |
| req.set_instance_id(instance_id); |
| req.set_op(AlterInstanceRequest::REFRESH); |
| stub->alter_instance(&cntl, &req, &res, nullptr); |
| if (cntl.Failed()) { |
| LOG_WARNING("issue refresh instance rpc") |
| .tag("endpoint", endpoint) |
| .tag("num_try", num_try) |
| .tag("code", cntl.ErrorCode()) |
| .tag("msg", cntl.ErrorText()); |
| } else { |
| succ = res.status().code() == MetaServiceCode::OK; |
| LOG(INFO) << (succ ? "succ" : "failed") |
| << " to issue refresh_instance rpc, num_try=" << num_try |
| << " endpoint=" << endpoint << " response=" << proto_to_json(res); |
| if (succ) return; |
| } |
| bthread_usleep(300000); |
| } |
| if (succ) return; |
| LOG(WARNING) << "failed to refresh finally, it may left the system inconsistent," |
| << " tired=" << num_try; |
| }); |
| bthread_t bid; |
| ret = bthread_start_background(&bid, nullptr, run_bthread_work, f); |
| if (ret != 0) continue; |
| btids.emplace_back(bid); |
| } // for |
| for (auto& i : btids) bthread_join(i, nullptr); |
| LOG(INFO) << "finish notify_refresh_instance, num_items=" << reg.items_size(); |
| } |
| |
| } // namespace doris::cloud |