blob: 9c37d7817655109f0b256365ecce9e50cee9261d [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "resource_manager.h"
#include <gen_cpp/cloud.pb.h>
#include <regex>
#include <sstream>
#include "common/logging.h"
#include "common/string_util.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/txn_kv_error.h"
namespace doris::cloud {
static std::atomic_int64_t seq = 0;
int ResourceManager::init() {
// Scan all instances
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(INFO) << "failed to init txn, err=" << err;
return -1;
}
InstanceKeyInfo key0_info {""};
InstanceKeyInfo key1_info {"\xff"}; // instance id are human readable strings
std::string key0;
std::string key1;
instance_key(key0_info, &key0);
instance_key(key1_info, &key1);
std::unique_ptr<RangeGetIterator> it;
int num_instances = 0;
std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
(int*)0x01, [key0, key1, &num_instances](int*) {
LOG(INFO) << "get instances, num_instances=" << num_instances << " range=["
<< hex(key0) << "," << hex(key1) << "]";
});
// instance_id instance
std::vector<std::tuple<std::string, InstanceInfoPB>> instances;
int limit = 10000;
TEST_SYNC_POINT_CALLBACK("ResourceManager:init:limit", &limit);
do {
TxnErrorCode err = txn->get(key0, key1, &it, false, limit);
TEST_SYNC_POINT_CALLBACK("ResourceManager:init:get_err", &err);
if (err == TxnErrorCode::TXN_TOO_OLD) {
LOG(WARNING) << "failed to get instance, err=txn too old, "
<< " already read " << instances.size() << " instance, "
<< " now fallback to non snapshot scan";
err = txn_kv_->create_txn(&txn);
if (err == TxnErrorCode::TXN_OK) {
err = txn->get(key0, key1, &it);
}
}
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "internal error, failed to get instance, err=" << err;
return -1;
}
while (it->has_next()) {
auto [k, v] = it->next();
if (!it->has_next()) key0 = k;
LOG(INFO) << "range get instance_key=" << hex(k);
instances.emplace_back("", InstanceInfoPB {});
auto& [instance_id, inst] = instances.back();
if (!inst.ParseFromArray(v.data(), v.size())) {
LOG(WARNING) << "malformed instance, unable to deserialize, key=" << hex(k);
return -1;
}
// 0x01 "instance" ${instance_id} -> InstanceInfoPB
k.remove_prefix(1); // Remove key space
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
int ret = decode_key(&k, &out);
if (ret != 0) {
LOG(WARNING) << "failed to decode key, ret=" << ret;
return -2;
}
if (out.size() != 2) {
LOG(WARNING) << "decoded size no match, expect 2, given=" << out.size();
}
instance_id = std::get<std::string>(std::get<0>(out[1]));
LOG(INFO) << "get an instance, instance_id=" << instance_id
<< " instance json=" << proto_to_json(inst);
++num_instances;
}
key0.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
for (auto& [inst_id, inst] : instances) {
for (auto& c : inst.clusters()) {
add_cluster_to_index(inst_id, c);
}
}
return 0;
}
std::string ResourceManager::get_node(const std::string& cloud_unique_id,
std::vector<NodeInfo>* nodes) {
// FIXME(gavin): refresh the all instance if there is a miss?
// Or we can refresh all instances regularly to reduce
// read amplification.
std::shared_lock l(mtx_);
auto [s, e] = node_info_.equal_range(cloud_unique_id);
if (s == node_info_.end() || s->first != cloud_unique_id) {
LOG(INFO) << "cloud unique id not found cloud_unique_id=" << cloud_unique_id;
return "cloud_unique_id not found";
}
nodes->reserve(nodes->size() + node_info_.count(cloud_unique_id));
for (auto i = s; i != e; ++i) {
nodes->emplace_back(i->second); // Just copy, it's cheap
}
return "";
}
bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std::string* err,
bool check_master_num) {
// check
if (!cluster.has_type()) {
*err = "cluster must have type arg";
return false;
}
const char* cluster_pattern_str = "^[a-zA-Z][a-zA-Z0-9_]*$";
std::regex txt_regex(cluster_pattern_str);
if (config::enable_cluster_name_check && cluster.has_cluster_name() &&
!std::regex_match(cluster.cluster_name(), txt_regex)) {
*err = "cluster name not regex with ^[a-zA-Z][a-zA-Z0-9_]*$, please check it";
return false;
}
std::stringstream ss;
bool no_err = true;
int master_num = 0;
int follower_num = 0;
for (auto& n : cluster.nodes()) {
// check here cloud_unique_id
std::string cloud_unique_id = n.cloud_unique_id();
auto [is_degrade_format, instance_id] = get_instance_id_by_cloud_unique_id(cloud_unique_id);
if (config::enable_check_instance_id && is_degrade_format &&
!is_instance_id_registered(instance_id)) {
ss << "node=" << n.DebugString()
<< " cloud_unique_id use degrade format, but check instance failed";
*err = ss.str();
return false;
}
if (ClusterPB::SQL == cluster.type() && n.has_edit_log_port() && n.edit_log_port() &&
n.has_node_type() &&
(n.node_type() == NodeInfoPB_NodeType_FE_MASTER ||
n.node_type() == NodeInfoPB_NodeType_FE_OBSERVER ||
n.node_type() == NodeInfoPB_NodeType_FE_FOLLOWER)) {
master_num += n.node_type() == NodeInfoPB_NodeType_FE_MASTER ? 1 : 0;
follower_num += n.node_type() == NodeInfoPB_NodeType_FE_FOLLOWER ? 1 : 0;
continue;
} else if (ClusterPB::COMPUTE == cluster.type() && n.has_heartbeat_port() &&
n.heartbeat_port()) {
continue;
}
ss << "check cluster params failed, edit_log_port is required for frontends while "
"heatbeat_port is required for banckens, node : "
<< proto_to_json(n);
*err = ss.str();
no_err = false;
break;
}
if (check_master_num && ClusterPB::SQL == cluster.type()) {
no_err = false;
if (master_num > 0 && follower_num > 0) {
ss << "cluster is SQL type, and use multi follower mode, cant set master node, master "
"count: "
<< master_num << " follower count: " << follower_num;
} else if (!follower_num && master_num != 1) {
ss << "cluster is SQL type, must have only one master node, now master count: "
<< master_num;
} else {
// followers mode
// 1. followers 2. observers + followers
no_err = true;
ss << "";
}
*err = ss.str();
}
return no_err;
}
std::pair<bool, std::string> ResourceManager::get_instance_id_by_cloud_unique_id(
const std::string& cloud_unique_id) {
auto v = split(cloud_unique_id, ':');
if (v.size() != 3) return {false, ""};
// degraded format check it
int version = std::atoi(v[0].c_str());
if (version != 1) return {false, ""};
return {true, v[1]};
}
bool ResourceManager::is_instance_id_registered(const std::string& instance_id) {
// check kv
auto [c0, m0] = get_instance(nullptr, instance_id, nullptr);
{ TEST_SYNC_POINT_CALLBACK("is_instance_id_registered", &c0); }
if (c0 != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to check instance instance_id=" << instance_id
<< ", code=" << format_as(c0) << ", info=" + m0;
}
return c0 == TxnErrorCode::TXN_OK;
}
std::pair<MetaServiceCode, std::string> ResourceManager::add_cluster(const std::string& instance_id,
const ClusterInfo& cluster) {
std::string msg;
std::stringstream ss;
std::unique_ptr<int, std::function<void(int*)>> defer(
(int*)0x01, [&msg](int*) { LOG(INFO) << "add_cluster err=" << msg; });
if (!check_cluster_params_valid(cluster.cluster, &msg, true)) {
LOG(WARNING) << msg;
return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
}
// FIXME(gavin): ensure atomicity of the entire process of adding cluster.
// Inserting a placeholer to node_info_ before persistence
// and updating it after persistence for the cloud_unique_id
// to add is a fairly fine solution.
{
std::shared_lock l(mtx_);
// Check uniqueness of cloud_unique_id to add, cloud unique ids are not
// shared between instances
for (auto& i : cluster.cluster.nodes()) {
// check cloud_unique_id in the same instance
auto [start, end] = node_info_.equal_range(i.cloud_unique_id());
if (start == node_info_.end() || start->first != i.cloud_unique_id()) continue;
for (auto it = start; it != end; ++it) {
if (it->second.instance_id != instance_id) {
// different instance, but has same cloud_unique_id
ss << "cloud_unique_id is already occupied by an instance,"
<< " instance_id=" << it->second.instance_id
<< " cluster_name=" << it->second.cluster_name
<< " cluster_id=" << it->second.cluster_id
<< " cloud_unique_id=" << it->first;
msg = ss.str();
LOG(INFO) << msg;
return std::make_pair(MetaServiceCode::ALREADY_EXISTED, msg);
}
}
}
}
std::unique_ptr<Transaction> txn0;
TxnErrorCode err = txn_kv_->create_txn(&txn0);
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to create txn";
LOG(WARNING) << msg << " err=" << err;
return std::make_pair(cast_as<ErrCategory::CREATE>(err), msg);
}
std::shared_ptr<Transaction> txn(txn0.release());
InstanceInfoPB instance;
auto [c0, m0] = get_instance(txn, instance_id, &instance);
if (c0 != TxnErrorCode::TXN_OK) {
msg = "failed to get instance, info " + m0;
LOG(WARNING) << msg << " err=" << c0;
return std::make_pair(cast_as<ErrCategory::READ>(c0), msg);
}
if (instance.status() == InstanceInfoPB::DELETED) {
msg = "instance status has been set delete, plz check it";
return std::make_pair(MetaServiceCode::CLUSTER_NOT_FOUND, msg);
}
LOG(INFO) << "cluster to add json=" << proto_to_json(cluster.cluster);
LOG(INFO) << "json=" << proto_to_json(instance);
// Check id and name, they need to be unique
// One cluster id per name, name is alias of cluster id
for (auto& i : instance.clusters()) {
if (i.cluster_id() == cluster.cluster.cluster_id()) {
ss << "try to add a existing cluster id,"
<< " existing_cluster_id=" << i.cluster_id();
msg = ss.str();
return std::make_pair(MetaServiceCode::ALREADY_EXISTED, msg);
}
if (i.cluster_name() == cluster.cluster.cluster_name()) {
ss << "try to add a existing cluster name,"
<< " existing_cluster_name=" << i.cluster_name();
msg = ss.str();
return std::make_pair(MetaServiceCode::ALREADY_EXISTED, msg);
}
}
// TODO(gavin): Check duplicated nodes, one node cannot deploy on multiple clusters
auto now_time = std::chrono::system_clock::now();
uint64_t time =
std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count();
for (auto& n : cluster.cluster.nodes()) {
auto& node = const_cast<std::decay_t<decltype(n)>&>(n);
node.set_ctime(time);
node.set_mtime(time);
}
auto to_add_cluster = instance.add_clusters();
to_add_cluster->CopyFrom(cluster.cluster);
// create compute cluster, set it status normal as default value
if (cluster.cluster.type() == ClusterPB::COMPUTE) {
to_add_cluster->set_cluster_status(ClusterStatus::NORMAL);
}
LOG(INFO) << "instance " << instance_id << " has " << instance.clusters().size() << " clusters";
InstanceKeyInfo key_info {instance_id};
std::string key;
std::string val;
instance_key(key_info, &key);
val = instance.SerializeAsString();
if (val.empty()) {
msg = "failed to serialize";
return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg);
}
txn->put(key, val);
LOG(INFO) << "put instance_key=" << hex(key);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to commit kv txn";
LOG(WARNING) << msg << " err=" << err;
return std::make_pair(cast_as<ErrCategory::COMMIT>(err), msg);
}
add_cluster_to_index(instance_id, cluster.cluster);
return std::make_pair(MetaServiceCode::OK, "");
}
std::pair<MetaServiceCode, std::string> ResourceManager::drop_cluster(
const std::string& instance_id, const ClusterInfo& cluster) {
std::stringstream ss;
std::string msg;
std::string cluster_id = cluster.cluster.has_cluster_id() ? cluster.cluster.cluster_id() : "";
if (cluster_id.empty()) {
ss << "missing cluster_id=" << cluster_id;
msg = ss.str();
LOG(INFO) << msg;
return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
}
std::unique_ptr<Transaction> txn0;
TxnErrorCode err = txn_kv_->create_txn(&txn0);
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to create txn";
LOG(WARNING) << msg << " err=" << err;
return std::make_pair(cast_as<ErrCategory::CREATE>(err), msg);
}
std::shared_ptr<Transaction> txn(txn0.release());
InstanceInfoPB instance;
auto [c0, m0] = get_instance(txn, instance_id, &instance);
if (c0 == TxnErrorCode::TXN_KEY_NOT_FOUND) {
msg = m0;
LOG(WARNING) << msg;
return std::make_pair(MetaServiceCode::CLUSTER_NOT_FOUND, msg);
}
if (c0 != TxnErrorCode::TXN_OK) {
msg = m0;
LOG(WARNING) << msg;
return std::make_pair(cast_as<ErrCategory::READ>(c0), msg);
}
if (instance.status() == InstanceInfoPB::DELETED) {
msg = "instance status has been set delete, plz check it";
return std::make_pair(MetaServiceCode::CLUSTER_NOT_FOUND, msg);
}
bool found = false;
int idx = -1;
ClusterPB to_del;
// Check id and name, they need to be unique
// One cluster id per name, name is alias of cluster id
for (auto& i : instance.clusters()) {
++idx;
if (i.cluster_id() == cluster.cluster.cluster_id()) {
to_del.CopyFrom(i);
LOG(INFO) << "found a cluster to drop,"
<< " instance_id=" << instance_id << " cluster_id=" << i.cluster_id()
<< " cluster_name=" << i.cluster_name() << " cluster=" << proto_to_json(i);
found = true;
break;
}
}
if (!found) {
ss << "failed to find cluster to drop,"
<< " instance_id=" << instance_id << " cluster_id=" << cluster.cluster.cluster_id()
<< " cluster_name=" << cluster.cluster.cluster_name();
msg = ss.str();
return std::make_pair(MetaServiceCode::CLUSTER_NOT_FOUND, msg);
}
InstanceInfoPB new_instance(instance);
new_instance.mutable_clusters()->DeleteSubrange(idx, 1); // Remove it
InstanceKeyInfo key_info {instance_id};
std::string key;
std::string val;
instance_key(key_info, &key);
val = new_instance.SerializeAsString();
if (val.empty()) {
msg = "failed to serialize";
LOG(WARNING) << msg;
return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg);
}
txn->put(key, val);
LOG(INFO) << "put instance_key=" << hex(key);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to commit kv txn";
LOG(WARNING) << msg << " err=" << err;
return std::make_pair(cast_as<ErrCategory::COMMIT>(err), msg);
}
remove_cluster_from_index(instance_id, to_del);
return std::make_pair(MetaServiceCode::OK, "");
}
std::string ResourceManager::update_cluster(
const std::string& instance_id, const ClusterInfo& cluster,
std::function<bool(const ClusterPB&)> filter,
std::function<std::string(ClusterPB&, std::set<std::string>& cluster_names)> action) {
std::stringstream ss;
std::string msg;
std::string cluster_id = cluster.cluster.has_cluster_id() ? cluster.cluster.cluster_id() : "";
std::string cluster_name =
cluster.cluster.has_cluster_name() ? cluster.cluster.cluster_name() : "";
if (cluster_id.empty()) {
ss << "missing cluster_id=" << cluster_id;
msg = ss.str();
LOG(INFO) << msg;
return msg;
}
std::unique_ptr<Transaction> txn0;
TxnErrorCode err = txn_kv_->create_txn(&txn0);
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to create txn";
LOG(WARNING) << msg << " err=" << err;
return msg;
}
std::shared_ptr<Transaction> txn(txn0.release());
InstanceInfoPB instance;
auto [c0, m0] = get_instance(txn, instance_id, &instance);
if (c0 != TxnErrorCode::TXN_OK) {
msg = m0;
return msg;
}
if (instance.status() == InstanceInfoPB::DELETED) {
msg = "instance status has been set delete, plz check it";
return msg;
}
std::set<std::string> cluster_names;
// collect cluster_names
for (auto& i : instance.clusters()) {
cluster_names.emplace(i.cluster_name());
}
bool found = false;
int idx = -1;
// Check id and name, they need to be unique
// One cluster id per name, name is alias of cluster id
for (auto& i : instance.clusters()) {
++idx;
if (filter(i)) {
LOG(INFO) << "found a cluster to update,"
<< " instance_id=" << instance_id << " cluster_id=" << i.cluster_id()
<< " cluster_name=" << i.cluster_name() << " cluster=" << proto_to_json(i);
found = true;
break;
}
}
if (!found) {
ss << "failed to find cluster to update,"
<< " instance_id=" << instance_id << " cluster_id=" << cluster.cluster.cluster_id()
<< " cluster_name=" << cluster.cluster.cluster_name();
msg = ss.str();
return msg;
}
auto& clusters = const_cast<std::decay_t<decltype(instance.clusters())>&>(instance.clusters());
// do update
ClusterPB original = clusters[idx];
msg = action(clusters[idx], cluster_names);
if (!msg.empty()) {
return msg;
}
ClusterPB now = clusters[idx];
LOG(INFO) << "before update cluster original: " << proto_to_json(original)
<< " after update now: " << proto_to_json(now);
InstanceKeyInfo key_info {instance_id};
std::string key;
std::string val;
instance_key(key_info, &key);
val = instance.SerializeAsString();
if (val.empty()) {
msg = "failed to serialize";
return msg;
}
txn->put(key, val);
LOG(INFO) << "put instanace_key=" << hex(key);
TxnErrorCode err_code = txn->commit();
if (err_code != TxnErrorCode::TXN_OK) {
msg = "failed to commit kv txn";
LOG(WARNING) << msg << " err=" << msg;
return msg;
}
LOG(INFO) << "update cluster instance_id=" << instance_id
<< " instance json=" << proto_to_json(instance);
update_cluster_to_index(instance_id, original, now);
return msg;
}
void ResourceManager::update_cluster_to_index(const std::string& instance_id,
const ClusterPB& original, const ClusterPB& now) {
std::lock_guard l(mtx_);
remove_cluster_from_index_no_lock(instance_id, original);
add_cluster_to_index_no_lock(instance_id, now);
}
void ResourceManager::add_cluster_to_index_no_lock(const std::string& instance_id,
const ClusterPB& c) {
auto type = c.has_type() ? c.type() : -1;
Role role = (type == ClusterPB::SQL
? Role::SQL_SERVER
: (type == ClusterPB::COMPUTE ? Role::COMPUTE_NODE : Role::UNDEFINED));
LOG(INFO) << "add cluster to index, instance_id=" << instance_id << " cluster_type=" << type
<< " cluster_name=" << c.cluster_name() << " cluster_id=" << c.cluster_id();
for (auto& i : c.nodes()) {
bool existed = node_info_.count(i.cloud_unique_id());
NodeInfo n {.role = role,
.instance_id = instance_id,
.cluster_name = c.cluster_name(),
.cluster_id = c.cluster_id(),
.node_info = i};
LOG(WARNING) << (existed ? "duplicated cloud_unique_id " : "")
<< "instance_id=" << instance_id << " cloud_unique_id=" << i.cloud_unique_id()
<< " node_info=" << proto_to_json(i);
node_info_.insert({i.cloud_unique_id(), std::move(n)});
}
}
void ResourceManager::add_cluster_to_index(const std::string& instance_id, const ClusterPB& c) {
std::lock_guard l(mtx_);
add_cluster_to_index_no_lock(instance_id, c);
}
void ResourceManager::remove_cluster_from_index_no_lock(const std::string& instance_id,
const ClusterPB& c) {
std::string cluster_name = c.cluster_name();
std::string cluster_id = c.cluster_id();
int cnt = 0;
for (auto it = node_info_.begin(); it != node_info_.end();) {
auto& [_, n] = *it;
if (n.instance_id != instance_id || n.cluster_id != cluster_id ||
n.cluster_name != cluster_name) {
++it;
continue;
}
++cnt;
LOG(INFO) << "remove node from index, instance_id=" << instance_id
<< " role=" << static_cast<int>(n.role) << " cluster_name=" << n.cluster_name
<< " cluster_id=" << n.cluster_id << " node_info=" << proto_to_json(n.node_info);
it = node_info_.erase(it);
}
LOG(INFO) << cnt << " nodes removed from index, cluster_id=" << cluster_id
<< " cluster_name=" << cluster_name << " instance_id=" << instance_id;
}
void ResourceManager::remove_cluster_from_index(const std::string& instance_id,
const ClusterPB& c) {
std::lock_guard l(mtx_);
remove_cluster_from_index_no_lock(instance_id, c);
}
std::pair<TxnErrorCode, std::string> ResourceManager::get_instance(std::shared_ptr<Transaction> txn,
const std::string& instance_id,
InstanceInfoPB* inst_pb) {
std::pair<TxnErrorCode, std::string> ec {TxnErrorCode::TXN_OK, ""};
[[maybe_unused]] auto& [code, msg] = ec;
std::stringstream ss;
InstanceKeyInfo key_info {instance_id};
std::string key;
std::string val;
instance_key(key_info, &key);
if (txn == nullptr) {
std::unique_ptr<Transaction> txn0;
TxnErrorCode err = txn_kv_->create_txn(&txn0);
if (err != TxnErrorCode::TXN_OK) {
code = err;
msg = "failed to create txn";
LOG(WARNING) << msg << " err=" << err;
return ec;
}
txn.reset(txn0.release());
}
TxnErrorCode err = txn->get(key, &val);
LOG(INFO) << "get instance_key=" << hex(key);
if (err != TxnErrorCode::TXN_OK) {
code = err;
ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
msg = ss.str();
return ec;
}
if (inst_pb != nullptr && !inst_pb->ParseFromString(val)) {
code = TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
msg = "failed to parse InstanceInfoPB";
return ec;
}
return ec;
}
// check instance pb is valid
bool is_instance_valid(const InstanceInfoPB& instance) {
// check has fe node
for (auto& c : instance.clusters()) {
if (c.has_type() && c.type() == ClusterPB::SQL) {
int master = 0;
int follower = 0;
std::string mode = "multi-followers";
for (auto& n : c.nodes()) {
if (n.node_type() == NodeInfoPB::FE_MASTER) {
mode = "master-observers";
master++;
} else if (n.node_type() == NodeInfoPB::FE_FOLLOWER) {
follower++;
}
}
// if master/observers mode , not have master or have multi master, return false
if (mode == "master-observers" && master != 1) {
return false;
}
// if multi followers mode, not have follower, return false
if (mode == "multi-followers" && !follower) {
return false;
}
return true;
}
}
// check others ...
return true;
}
std::string ResourceManager::modify_nodes(const std::string& instance_id,
const std::vector<NodeInfo>& to_add,
const std::vector<NodeInfo>& to_del) {
std::string msg;
std::stringstream ss;
std::unique_ptr<int, std::function<void(int*)>> defer(
(int*)0x01, [&msg](int*) { LOG(INFO) << "modify_nodes err=" << msg; });
if ((to_add.size() && to_del.size()) || (!to_add.size() && !to_del.size())) {
msg = "to_add and to_del both empty or both not empty";
LOG(WARNING) << msg;
return msg;
}
std::unique_ptr<Transaction> txn0;
TxnErrorCode err = txn_kv_->create_txn(&txn0);
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to create txn";
LOG(WARNING) << msg << " err=" << msg;
return msg;
}
std::shared_ptr<Transaction> txn(txn0.release());
InstanceInfoPB instance;
auto [c0, m0] = get_instance(txn, instance_id, &instance);
TEST_SYNC_POINT_CALLBACK("modify_nodes:get_instance", &c0, &instance);
if (c0 != TxnErrorCode::TXN_OK) {
msg = m0;
return msg;
}
if (instance.status() == InstanceInfoPB::DELETED) {
msg = "instance status has been set delete, plz check it";
LOG(WARNING) << msg;
return msg;
}
LOG(INFO) << "instance json=" << proto_to_json(instance);
std::vector<std::pair<ClusterPB, ClusterPB>> vec;
using modify_impl_func = std::function<std::string(const ClusterPB& c, const NodeInfo& n)>;
using check_func = std::function<std::string(const NodeInfo& n)>;
auto modify_func = [&](const NodeInfo& node, check_func check,
modify_impl_func action) -> std::string {
std::string cluster_id = node.cluster_id;
std::string cluster_name = node.cluster_name;
{
std::shared_lock l(mtx_);
msg = check(node);
if (msg != "") {
return msg;
}
}
LOG(INFO) << "node to modify json=" << proto_to_json(node.node_info);
for (auto& c : instance.clusters()) {
if ((c.has_cluster_name() && c.cluster_name() == cluster_name) ||
(c.has_cluster_id() && c.cluster_id() == cluster_id)) {
msg = action(c, node);
if (msg != "") {
return msg;
}
}
}
return "";
};
check_func check_to_add = [&](const NodeInfo& n) -> std::string {
std::string err;
std::stringstream s;
auto [start, end] = node_info_.equal_range(n.node_info.cloud_unique_id());
if (start == node_info_.end() || start->first != n.node_info.cloud_unique_id()) {
return "";
}
for (auto it = start; it != end; ++it) {
if (it->second.instance_id != n.instance_id) {
// different instance, but has same cloud_unique_id
s << "cloud_unique_id is already occupied by an instance,"
<< " instance_id=" << it->second.instance_id
<< " cluster_name=" << it->second.cluster_name
<< " cluster_id=" << it->second.cluster_id
<< " cloud_unique_id=" << n.node_info.cloud_unique_id();
err = s.str();
LOG(INFO) << err;
return err;
}
}
return "";
};
modify_impl_func modify_to_add = [&](const ClusterPB& c, const NodeInfo& n) -> std::string {
std::string err;
std::stringstream s;
ClusterPB copied_original_cluster;
ClusterPB copied_cluster;
bool is_compute_node = n.node_info.has_heartbeat_port();
for (auto it = c.nodes().begin(); it != c.nodes().end(); ++it) {
if (it->has_ip() && n.node_info.has_ip()) {
std::string c_endpoint = it->ip() + ":" +
(is_compute_node ? std::to_string(it->heartbeat_port())
: std::to_string(it->edit_log_port()));
std::string n_endpoint =
n.node_info.ip() + ":" +
(is_compute_node ? std::to_string(n.node_info.heartbeat_port())
: std::to_string(n.node_info.edit_log_port()));
if (c_endpoint == n_endpoint) {
// replicate request, do nothing
return "";
}
}
if (it->has_host() && n.node_info.has_host()) {
std::string c_endpoint_host =
it->host() + ":" +
(is_compute_node ? std::to_string(it->heartbeat_port())
: std::to_string(it->edit_log_port()));
std::string n_endpoint_host =
n.node_info.host() + ":" +
(is_compute_node ? std::to_string(n.node_info.heartbeat_port())
: std::to_string(n.node_info.edit_log_port()));
if (c_endpoint_host == n_endpoint_host) {
// replicate request, do nothing
return "";
}
}
}
// add ctime and mtime
auto& node = const_cast<std::decay_t<decltype(n.node_info)>&>(n.node_info);
auto now_time = std::chrono::system_clock::now();
uint64_t time =
std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch())
.count();
if (!node.has_ctime()) {
node.set_ctime(time);
}
node.set_mtime(time);
copied_original_cluster.CopyFrom(c);
auto& change_cluster = const_cast<std::decay_t<decltype(c)>&>(c);
change_cluster.add_nodes()->CopyFrom(node);
copied_cluster.CopyFrom(change_cluster);
vec.emplace_back(std::move(copied_original_cluster), std::move(copied_cluster));
return "";
};
for (auto& it : to_add) {
msg = modify_func(it, check_to_add, modify_to_add);
if (msg != "") {
LOG(WARNING) << msg;
return msg;
}
}
check_func check_to_del = [&](const NodeInfo& n) -> std::string {
std::string err;
std::stringstream s;
auto [start, end] = node_info_.equal_range(n.node_info.cloud_unique_id());
if (start == node_info_.end() || start->first != n.node_info.cloud_unique_id()) {
s << "cloud_unique_id can not find to drop node,"
<< " instance_id=" << n.instance_id << " cluster_name=" << n.cluster_name
<< " cluster_id=" << n.cluster_id
<< " cloud_unique_id=" << n.node_info.cloud_unique_id();
err = s.str();
LOG(WARNING) << err;
return err;
}
bool found = false;
for (auto it = start; it != end; ++it) {
const auto& m_node = it->second.node_info;
if (m_node.has_ip() && n.node_info.has_ip()) {
std::string m_endpoint =
m_node.ip() + ":" +
(m_node.has_heartbeat_port() ? std::to_string(m_node.heartbeat_port())
: std::to_string(m_node.edit_log_port()));
std::string n_endpoint = n.node_info.ip() + ":" +
(n.node_info.has_heartbeat_port()
? std::to_string(n.node_info.heartbeat_port())
: std::to_string(n.node_info.edit_log_port()));
if (m_endpoint == n_endpoint) {
found = true;
break;
}
}
if (m_node.has_host() && n.node_info.has_host()) {
std::string m_endpoint_host =
m_node.host() + ":" +
(m_node.has_heartbeat_port() ? std::to_string(m_node.heartbeat_port())
: std::to_string(m_node.edit_log_port()));
std::string n_endpoint_host =
n.node_info.host() + ":" +
(n.node_info.has_heartbeat_port()
? std::to_string(n.node_info.heartbeat_port())
: std::to_string(n.node_info.edit_log_port()));
if (m_endpoint_host == n_endpoint_host) {
found = true;
break;
}
}
}
if (!found) {
s << "cloud_unique_id can not find to drop node,"
<< " instance_id=" << n.instance_id << " cluster_name=" << n.cluster_name
<< " cluster_id=" << n.cluster_id
<< " cloud_unique_id=" << n.node_info.cloud_unique_id();
err = s.str();
LOG(WARNING) << err;
return err;
}
return "";
};
modify_impl_func modify_to_del = [&](const ClusterPB& c, const NodeInfo& n) -> std::string {
std::string err;
std::stringstream s;
ClusterPB copied_original_cluster;
ClusterPB copied_cluster;
bool found = false;
int idx = -1;
const auto& ni = n.node_info;
for (auto& cn : c.nodes()) {
idx++;
if (cn.has_ip() && ni.has_ip()) {
std::string cn_endpoint =
cn.ip() + ":" +
(cn.has_heartbeat_port() ? std::to_string(cn.heartbeat_port())
: std::to_string(cn.edit_log_port()));
std::string ni_endpoint =
ni.ip() + ":" +
(ni.has_heartbeat_port() ? std::to_string(ni.heartbeat_port())
: std::to_string(ni.edit_log_port()));
if (ni.cloud_unique_id() == cn.cloud_unique_id() && cn_endpoint == ni_endpoint) {
found = true;
break;
}
}
if (cn.has_host() && ni.has_host()) {
std::string cn_endpoint_host =
cn.host() + ":" +
(cn.has_heartbeat_port() ? std::to_string(cn.heartbeat_port())
: std::to_string(cn.edit_log_port()));
std::string ni_endpoint_host =
ni.host() + ":" +
(ni.has_heartbeat_port() ? std::to_string(ni.heartbeat_port())
: std::to_string(ni.edit_log_port()));
if (ni.cloud_unique_id() == cn.cloud_unique_id() &&
cn_endpoint_host == ni_endpoint_host) {
found = true;
break;
}
}
}
if (!found) {
s << "failed to find node to drop,"
<< " instance_id=" << instance.instance_id() << " cluster_id=" << c.cluster_id()
<< " cluster_name=" << c.cluster_name() << " cluster=" << proto_to_json(c);
err = s.str();
LOG(WARNING) << err;
// not found return ok.
return "";
}
copied_original_cluster.CopyFrom(c);
auto& change_nodes = const_cast<std::decay_t<decltype(c.nodes())>&>(c.nodes());
change_nodes.DeleteSubrange(idx, 1); // Remove it
copied_cluster.CopyFrom(c);
vec.emplace_back(std::move(copied_original_cluster), std::move(copied_cluster));
return "";
};
for (auto& it : to_del) {
msg = modify_func(it, check_to_del, modify_to_del);
if (msg != "") {
LOG(WARNING) << msg;
// not found, just return OK to cloud control
return "";
}
}
LOG(INFO) << "instance " << instance_id << " info: " << instance.DebugString();
if (!to_del.empty() && !is_instance_valid(instance)) {
msg = "instance invalid, cant modify, plz check";
LOG(WARNING) << msg;
return msg;
}
InstanceKeyInfo key_info {instance_id};
std::string key;
std::string val;
instance_key(key_info, &key);
val = instance.SerializeAsString();
if (val.empty()) {
msg = "failed to serialize";
return msg;
}
txn->put(key, val);
LOG(INFO) << "put instance_key=" << hex(key);
TxnErrorCode err_code = txn->commit();
if (err_code != TxnErrorCode::TXN_OK) {
msg = "failed to commit kv txn";
LOG(WARNING) << msg << " err=" << err_code;
return msg;
}
for (auto& it : vec) {
update_cluster_to_index(instance_id, it.first, it.second);
}
return "";
}
std::pair<MetaServiceCode, std::string> ResourceManager::refresh_instance(
const std::string& instance_id) {
LOG(INFO) << "begin to refresh instance, instance_id=" << instance_id << " seq=" << ++seq;
std::pair<MetaServiceCode, std::string> ret0 {MetaServiceCode::OK, "OK"};
auto& [code, msg] = ret0;
std::unique_ptr<int, std::function<void(int*)>> defer_log(
(int*)0x01, [&ret0, &instance_id](int*) {
LOG(INFO) << (std::get<0>(ret0) == MetaServiceCode::OK ? "succ to " : "failed to ")
<< "refresh_instance, instance_id=" << instance_id
<< " code=" << std::get<0>(ret0) << " msg=" << std::get<1>(ret0);
});
std::unique_ptr<Transaction> txn0;
TxnErrorCode err = txn_kv_->create_txn(&txn0);
if (err != TxnErrorCode::TXN_OK) {
code = MetaServiceCode::KV_TXN_CREATE_ERR;
msg = "failed to create txn";
LOG(WARNING) << msg << " err=" << err;
return ret0;
}
std::shared_ptr<Transaction> txn(txn0.release());
InstanceInfoPB instance;
auto [c0, m0] = get_instance(txn, instance_id, &instance);
if (c0 != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(c0);
msg = m0;
return ret0;
}
std::vector<ClusterInfo> clusters;
clusters.reserve(instance.clusters_size());
std::lock_guard l(mtx_);
for (auto i = node_info_.begin(); i != node_info_.end();) {
if (i->second.instance_id != instance_id) {
++i;
continue;
}
i = node_info_.erase(i);
}
for (int i = 0; i < instance.clusters_size(); ++i) {
add_cluster_to_index_no_lock(instance_id, instance.clusters(i));
}
LOG(INFO) << "finish refreshing instance, instance_id=" << instance_id << " seq=" << seq;
return ret0;
}
} // namespace doris::cloud