| /* |
| * The MIT License (MIT) |
| * |
| * Copyright (c) 2015 Microsoft Corporation |
| * |
| * -=- Robust Distributed System Nucleus (rDSN) -=- |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining a copy |
| * of this software and associated documentation files (the "Software"), to deal |
| * in the Software without restriction, including without limitation the rights |
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| * copies of the Software, and to permit persons to whom the Software is |
| * furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be included in |
| * all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| * THE SOFTWARE. |
| */ |
| |
| #pragma once |
| |
| #include <stdint.h> |
| #include <algorithm> |
| #include <atomic> |
| #include <functional> |
| #include <map> |
| #include <memory> |
| #include <set> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/duplication_common.h" |
| #include "common/gpid.h" |
| #include "common/json_helper.h" |
| #include "common/replication_other_types.h" |
| #include "dsn.layer2_types.h" |
| #include "meta/duplication/duplication_info.h" |
| #include "meta_admin_types.h" |
| #include "metadata_types.h" |
| #include "runtime/api_layer1.h" |
| #include "runtime/rpc/rpc_host_port.h" |
| #include "runtime/task/task.h" |
| #include "utils/autoref_ptr.h" |
| #include "utils/blob.h" |
| #include "utils/enum_helper.h" |
| #include "utils/error_code.h" |
| #include "utils/extensible_object.h" |
| #include "utils/fmt_logging.h" |
| #include "utils/utils.h" |
| |
| namespace dsn { |
| class message_ex; |
| |
| namespace replication { |
| |
| enum class config_status |
| { |
| not_pending, |
| pending_proposal, // deprecated since pegasus v1.8 or older version |
| pending_remote_sync, |
| invalid_status |
| }; |
| |
| ENUM_BEGIN(config_status, config_status::invalid_status) |
| ENUM_REG(config_status::not_pending) |
| ENUM_REG(config_status::pending_proposal) |
| ENUM_REG(config_status::pending_remote_sync) |
| ENUM_END(config_status) |
| |
| enum class pc_status |
| { |
| healthy, |
| ill, |
| dead, |
| invalid |
| }; |
| |
| ENUM_BEGIN(pc_status, pc_status::invalid) |
| ENUM_REG(pc_status::healthy) |
| ENUM_REG(pc_status::ill) |
| ENUM_REG(pc_status::dead) |
| ENUM_END(pc_status) |
| |
| class pc_flags |
| { |
| public: |
| static const int dropped = 1; |
| }; |
| |
| class proposal_actions |
| { |
| private: |
| bool from_balancer; |
| |
| // used for track the learing process and check if abnormal situation happens |
| bool learning_progress_abnormal_detected; |
| replica_info current_learner; |
| |
| // NOTICE: |
| // meta servic use configuration_proposal_action::period_ts |
| // to store a expire timestamp, but a rpc_sender use this field |
| // to suggest a ttl period |
| std::vector<configuration_proposal_action> acts; |
| |
| public: |
| proposal_actions(); |
| void reset_tracked_current_learner(); |
| void track_current_learner(const host_port &node, const replica_info &info); |
| void clear(); |
| |
| // return the action in acts & whether the action is from balancer |
| bool is_from_balancer() const { return from_balancer; } |
| bool is_abnormal_learning_proposal() const; |
| |
| void pop_front(); |
| void assign_cure_proposal(const configuration_proposal_action &act); |
| void assign_balancer_proposals(const std::vector<configuration_proposal_action> &cpa_list); |
| |
| const configuration_proposal_action *front() const; |
| bool empty() const; |
| }; |
| |
| // |
| // structure "dropped_replica" represents a replica which was downgraded to inactive. |
| // there are 2 sources to get the dropped replica: |
| // 1. by record the meta's update-cfg action |
| // 2. by collect the inactive replicas reported from the replica servers |
| // generally, we give a partitial order for the dropped_replica, in which a higher order |
| // roughly means that the replica has MORE data. |
| // |
| // a load balancer may record a list of dropped_replica to track the drop history and use |
| // it to do the cure decision. |
| // |
| |
| // currently dropped_cmp depend on the dropped_replica::INVALID_TIMESTAMP is 0, |
| // if you modify the dropped_replica::INVALID_TIMESTAMP, please modify the dropped_cmp accordingly. |
| struct dropped_replica |
| { |
| dsn::host_port node; |
| |
| // if a drop-replica is generated by the update-cfg-req, then we can |
| // record the drop time (milliseconds) |
| uint64_t time; |
| // if a drop-replica is got from the replica server's report, then we can |
| // record (ballot, commit_decree, prepare_decree) |
| //[ |
| int64_t ballot; |
| int64_t last_committed_decree; |
| int64_t last_prepared_decree; |
| //] |
| static const uint64_t INVALID_TIMESTAMP = 0; |
| }; |
| |
| // the order of dropped_replica |
| // ret: |
| // 0 => equal |
| // negtive => d1 smaller than d2 |
| // positive => d1 larger than d2 |
| inline int dropped_cmp(const dropped_replica &d1, const dropped_replica &d2) |
| { |
| if (d1.time != d2.time) { |
| return (d1.time < d2.time) ? -1 : 1; |
| } |
| if (d1.ballot != d2.ballot) { |
| return d1.ballot < d2.ballot ? -1 : 1; |
| } |
| if (d1.last_committed_decree != d2.last_committed_decree) { |
| return d1.last_committed_decree < d2.last_committed_decree ? -1 : 1; |
| } |
| if (d1.last_prepared_decree != d2.last_prepared_decree) { |
| return d1.last_prepared_decree < d2.last_prepared_decree ? -1 : 1; |
| } |
| return 0; |
| } |
| |
| // Represent a replica that is serving. Info in this structure can only from config-sync of RS. |
| // Load balancer may use this to do balance decisions. |
| struct serving_replica |
| { |
| dsn::host_port node; |
| // TODO: report the storage size of replica |
| int64_t storage_mb; |
| std::string disk_tag; |
| manual_compaction_status::type compact_status; |
| }; |
| |
| class config_context |
| { |
| public: |
| partition_configuration *config_owner; |
| config_status stage; |
| // for server state's update config management |
| //[ |
| task_ptr pending_sync_task; |
| std::shared_ptr<configuration_update_request> pending_sync_request; |
| dsn::message_ex *msg; |
| //] |
| |
| // for load balancer's decision |
| //[ |
| proposal_actions lb_actions; |
| std::vector<serving_replica> serving; |
| std::vector<dropped_replica> dropped; |
| // An index value to the vector "dropped". |
| // Used in load-balancer's cure to avoid select the same learner as |
| // previous unsuccessful proposal. |
| // Please refer to partition_guardian::on_missing_secondary. |
| // |
| // This should always be less than the dropped.size() |
| // |
| // TODO: a more clear implementation |
| int32_t prefered_dropped; |
| //] |
| public: |
| void check_size(); |
| void cancel_sync(); |
| |
| std::vector<dropped_replica>::iterator find_from_dropped(const dsn::host_port &node); |
| std::vector<dropped_replica>::const_iterator find_from_dropped(const host_port &node) const; |
| |
| // return true if remove ok, false if node doesn't in dropped |
| bool remove_from_dropped(const dsn::host_port &node); |
| |
| // put recently downgraded node to dropped |
| // return true if put ok, false if the node has been in dropped |
| bool record_drop_history(const dsn::host_port &node); |
| |
| // Notice: please make sure whether node is actually an inactive or a serving replica |
| // ret: |
| // 1 => node has been in the dropped |
| // 0 => insert the info to the dropped |
| // -1 => info is too staled to insert |
| int collect_drop_replica(const dsn::host_port &node, const replica_info &info); |
| |
| // check if dropped vector satisfied the order |
| bool check_order(); |
| |
| std::vector<serving_replica>::iterator find_from_serving(const dsn::host_port &node); |
| std::vector<serving_replica>::const_iterator find_from_serving(const host_port &node) const; |
| |
| // return true if remove ok, false if node doesn't in serving |
| bool remove_from_serving(const dsn::host_port &node); |
| |
| void collect_serving_replica(const dsn::host_port &node, const replica_info &info); |
| |
| void adjust_proposal(const dsn::host_port &node, const replica_info &info); |
| |
| bool get_disk_tag(const host_port &node, /*out*/ std::string &disk_tag) const; |
| |
| public: |
| // intialize to 4 statically. |
| // and will be set by load-balancer module |
| static int MAX_REPLICA_COUNT_IN_GRROUP; |
| }; |
| |
| struct partition_configuration_stateless |
| { |
| partition_configuration &config; |
| partition_configuration_stateless(partition_configuration &pc) : config(pc) {} |
| std::vector<dsn::host_port> &workers() { return config.hp_last_drops; } |
| std::vector<dsn::host_port> &hosts() { return config.hp_secondaries; } |
| bool is_host(const host_port &node) const |
| { |
| return utils::contains(config.hp_secondaries, node); |
| } |
| bool is_worker(const host_port &node) const |
| { |
| return utils::contains(config.hp_last_drops, node); |
| } |
| bool is_member(const host_port &node) const { return is_host(node) || is_worker(node); } |
| }; |
| |
| struct restore_state |
| { |
| // restore_status: |
| // ERR_OK: restore haven't encounter some error |
| // ERR_CORRUPTION : data on backup media is damaged and we can not skip the damage data, |
| // so should restore rollback |
| // ERR_IGNORE_DAMAGED_DATA : data on backup media is damaged but we can skip the damage |
| // data, so skip the damaged partition |
| dsn::error_code restore_status; |
| int32_t progress; |
| std::string reason; |
| restore_state() : restore_status(dsn::ERR_OK), progress(0), reason() {} |
| }; |
| |
| // app partition_split states |
| // when starting partition split, `splitting_count` will be equal to old_partition_count, |
| // <parent_partition_index, SPLITTING> will be inserted into `status`. |
| // if partition[0] finish split, `splitting_count` will decrease and <0, SPLITTING> will be removed |
| // in `status`. |
| struct split_state |
| { |
| int32_t splitting_count; |
| // partition_index -> split_status |
| std::map<int32_t, split_status::type> status; |
| split_state() : splitting_count(0) {} |
| }; |
| |
| class app_state; |
| |
| class app_state_helper |
| { |
| public: |
| app_state *owner; |
| std::atomic_int partitions_in_progress; |
| std::vector<config_context> contexts; |
| dsn::message_ex *pending_response; |
| std::vector<restore_state> restore_states; |
| split_state split_states; |
| |
| public: |
| app_state_helper() : owner(nullptr), partitions_in_progress(0) |
| { |
| contexts.clear(); |
| pending_response = nullptr; |
| } |
| void on_init_partitions(); |
| void clear_proposals() |
| { |
| for (config_context &cc : contexts) { |
| cc.lb_actions.clear(); |
| } |
| } |
| |
| void reset_manual_compact_status(); |
| // get replica group manual compact progress |
| // return false if partition is not executing manual compaction |
| bool get_manual_compact_progress(/*out*/ int32_t &progress) const; |
| }; |
| |
| /* |
| * NOTICE: several keys in envs are reserved for recover from cold_backup: |
| * envs["block_service_provider"] = <block_service_provider> |
| * envs["cluster_name"] = <cluster_name> |
| * envs["policy_name"] = <policy_name> |
| * envs["app_name"] = <app_name> |
| * envs["app_id"] = <app_id> |
| * envs["backup_id"] = <backup_id> |
| * envs["skip_bad_partition"] = <"true" or "false"> |
| * |
| * after a newly assigned primary get these envs from app_info, it will try to |
| * init a replica with data stored on the block_device |
| */ |
| class app_state : public app_info |
| { |
| protected: |
| std::string log_name; |
| |
| public: |
| app_state(const app_info &info); |
| |
| public: |
| const char *get_logname() const { return log_name.c_str(); } |
| std::shared_ptr<app_state_helper> helpers; |
| std::vector<partition_configuration> partitions; |
| std::map<dupid_t, duplication_info_s_ptr> duplications; |
| |
| static std::shared_ptr<app_state> create(const app_info &info); |
| dsn::blob to_json(app_status::type temp_status) |
| { |
| app_info another = *this; |
| another.status = temp_status; |
| // persistent envs to zookeeper |
| dsn::blob result = dsn::json::json_forwarder<app_info>::encode(another); |
| return result; |
| } |
| bool splitting() const { return helpers->split_states.splitting_count > 0; } |
| }; |
| |
| typedef std::set<dsn::gpid> partition_set; |
| typedef std::map<app_id, std::shared_ptr<app_state>> app_mapper; |
| |
| class node_state : public extensible_object<node_state, 4> |
| { |
| private: |
| // partitions |
| std::map<int32_t, partition_set> app_primaries; |
| std::map<int32_t, partition_set> app_partitions; |
| unsigned total_primaries; |
| unsigned total_partitions; |
| |
| // status |
| bool is_alive; |
| bool has_collected_replicas; |
| dsn::host_port hp; |
| |
| const partition_set *get_partitions(app_id id, bool only_primary) const; |
| partition_set *get_partitions(app_id id, bool only_primary, bool create_new); |
| |
| public: |
| node_state(); |
| const partition_set *partitions(app_id id, bool only_primary) const; |
| partition_set *partitions(app_id id, bool only_primary); |
| |
| unsigned primary_count(app_id id) const; |
| unsigned secondary_count(app_id id) const { return partition_count(id) - primary_count(id); } |
| unsigned partition_count(app_id id) const; |
| |
| unsigned primary_count() const { return total_primaries; } |
| unsigned secondary_count() const { return total_partitions - total_primaries; } |
| unsigned partition_count() const { return total_partitions; } |
| |
| partition_status::type served_as(const gpid &pid) const; |
| |
| bool alive() const { return is_alive; } |
| void set_alive(bool alive) { is_alive = alive; } |
| bool has_collected() { return has_collected_replicas; } |
| void set_replicas_collect_flag(bool has_collected) { has_collected_replicas = has_collected; } |
| const dsn::host_port &host_port() const { return hp; } |
| void set_hp(const dsn::host_port &val) { hp = val; } |
| |
| void put_partition(const dsn::gpid &pid, bool is_primary); |
| void remove_partition(const dsn::gpid &pid, bool only_primary); |
| |
| bool for_each_partition(const std::function<bool(const dsn::gpid &pid)> &f) const; |
| bool for_each_partition(app_id id, const std::function<bool(const dsn::gpid &)> &f) const; |
| bool for_each_primary(app_id id, const std::function<bool(const dsn::gpid &pid)> &f) const; |
| }; |
| |
| typedef std::unordered_map<host_port, node_state> node_mapper; |
| typedef std::map<dsn::gpid, std::shared_ptr<configuration_balancer_request>> migration_list; |
| |
| struct meta_view |
| { |
| app_mapper *apps; |
| node_mapper *nodes; |
| }; |
| |
| inline node_state *get_node_state(node_mapper &nodes, const host_port &hp, bool create_new) |
| { |
| node_state *ns; |
| if (nodes.find(hp) == nodes.end()) { |
| if (!create_new) |
| return nullptr; |
| ns = &nodes[hp]; |
| ns->set_hp(hp); |
| } |
| ns = &nodes[hp]; |
| return ns; |
| } |
| |
| inline bool is_node_alive(const node_mapper &nodes, const host_port &hp) |
| { |
| auto iter = nodes.find(hp); |
| if (iter == nodes.end()) |
| return false; |
| return iter->second.alive(); |
| } |
| |
| inline const partition_configuration *get_config(const app_mapper &apps, const dsn::gpid &gpid) |
| { |
| auto iter = apps.find(gpid.get_app_id()); |
| if (iter == apps.end() || iter->second->status == app_status::AS_DROPPED) |
| return nullptr; |
| return &(iter->second->partitions[gpid.get_partition_index()]); |
| } |
| |
| inline partition_configuration *get_config(app_mapper &apps, const dsn::gpid &gpid) |
| { |
| auto iter = apps.find(gpid.get_app_id()); |
| if (iter == apps.end() || iter->second->status == app_status::AS_DROPPED) |
| return nullptr; |
| return &(iter->second->partitions[gpid.get_partition_index()]); |
| } |
| |
| inline const config_context *get_config_context(const app_mapper &apps, const dsn::gpid &gpid) |
| { |
| auto iter = apps.find(gpid.get_app_id()); |
| if (iter == apps.end() || iter->second->status == app_status::AS_DROPPED) |
| return nullptr; |
| return &(iter->second->helpers->contexts[gpid.get_partition_index()]); |
| } |
| |
| inline config_context *get_config_context(app_mapper &apps, const dsn::gpid &gpid) |
| { |
| auto iter = apps.find(gpid.get_app_id()); |
| if (iter == apps.end() || iter->second->status == app_status::AS_DROPPED) |
| return nullptr; |
| return &(iter->second->helpers->contexts[gpid.get_partition_index()]); |
| } |
| |
| inline int replica_count(const partition_configuration &pc) |
| { |
| int ans = pc.hp_primary ? 1 : 0; |
| return ans + pc.hp_secondaries.size(); |
| } |
| |
| enum health_status |
| { |
| HS_DEAD = 0, // (primary = 0 && secondary = 0) |
| HS_UNREADABLE, // (primary = 0 && secondary > 0) |
| HS_UNWRITABLE, // (primary = 1 && primary + secondary < mutation_2pc_min_replica_count) |
| HS_WRITABLE_ILL, // (primary = 1 && primary + secondary >= mutation_2pc_min_replica_count |
| // && primary + secondary < max_replica_count) |
| HS_HEALTHY, // (primary = 1 && primary + secondary >= max_replica_count) |
| HS_MAX_VALUE |
| }; |
| |
| inline health_status partition_health_status(const partition_configuration &pc, |
| int mutation_2pc_min_replica_count) |
| { |
| if (!pc.hp_primary) { |
| if (pc.hp_secondaries.empty()) |
| return HS_DEAD; |
| else |
| return HS_UNREADABLE; |
| } else { |
| int n = pc.hp_secondaries.size() + 1; |
| if (n < mutation_2pc_min_replica_count) |
| return HS_UNWRITABLE; |
| else if (n < pc.max_replica_count) |
| return HS_WRITABLE_ILL; |
| else |
| return HS_HEALTHY; |
| } |
| } |
| |
| inline void |
| for_each_available_app(const app_mapper &apps, |
| const std::function<bool(const std::shared_ptr<app_state> &)> &action) |
| { |
| for (const auto &p : apps) { |
| if (p.second->status == app_status::AS_AVAILABLE) { |
| if (!action(p.second)) |
| break; |
| } |
| } |
| } |
| |
| inline int count_partitions(const app_mapper &apps) |
| { |
| int result = 0; |
| for (auto iter : apps) |
| if (iter.second->status == app_status::AS_AVAILABLE) |
| result += iter.second->partition_count; |
| return result; |
| } |
| |
| void when_update_replicas(config_type::type t, const std::function<void(bool)> &func); |
| |
| template <typename T> |
| void maintain_drops(/*inout*/ std::vector<T> &drops, const T &node, config_type::type t) |
| { |
| auto action = [&drops, &node](bool is_adding) { |
| auto it = std::find(drops.begin(), drops.end(), node); |
| if (is_adding) { |
| if (it != drops.end()) { |
| drops.erase(it); |
| } |
| } else { |
| CHECK( |
| it == drops.end(), "the node({}) cannot be in drops set before this update", node); |
| drops.push_back(node); |
| if (drops.size() > 3) { |
| drops.erase(drops.begin()); |
| } |
| } |
| }; |
| when_update_replicas(t, action); |
| } |
| |
| // Try to construct a replica-group by current replica-infos of a gpid |
| // ret: |
| // if construct the replica successfully, return true. |
| // Notice: as long as we can construct something from current infos, we treat it as a |
| // success |
| bool construct_replica(meta_view view, const gpid &pid, int max_replica_count); |
| |
| // When replica infos are collected from replica servers, meta-server |
| // will use this to check if a replica on a server is useful |
| // params: |
| // node: the owner of the replica info |
| // info: the replica info on node |
| // ret: |
| // return true if the replica is accepted as an useful replica. Or-else false. |
| // WARNING: if false is returned, the replica on node may be garbage-collected |
| bool collect_replica(meta_view view, const host_port &node, const replica_info &info); |
| |
| inline bool has_seconds_expired(uint64_t second_ts) { return second_ts * 1000 < dsn_now_ms(); } |
| |
| inline bool has_milliseconds_expired(uint64_t milliseconds_ts) |
| { |
| return milliseconds_ts < dsn_now_ms(); |
| } |
| } // namespace replication |
| } // namespace dsn |
| |
| namespace dsn { |
| namespace json { |
| |
| inline void json_encode(dsn::json::JsonWriter &out, const replication::app_state &state) |
| { |
| json_forwarder<dsn::app_info>::encode(out, (const dsn::app_info &)state); |
| } |
| |
| inline bool json_decode(const dsn::json::JsonObject &in, replication::app_state &state) |
| { |
| return json_forwarder<dsn::app_info>::decode(in, (dsn::app_info &)state); |
| } |
| } // namespace json |
| } // namespace dsn |