| /* |
| * The MIT License (MIT) |
| * |
| * Copyright (c) 2015 Microsoft Corporation |
| * |
| * -=- Robust Distributed System Nucleus (rDSN) -=- |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining a copy |
| * of this software and associated documentation files (the "Software"), to deal |
| * in the Software without restriction, including without limitation the rights |
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| * copies of the Software, and to permit persons to whom the Software is |
| * furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be included in |
| * all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| * THE SOFTWARE. |
| */ |
| |
| /* |
| * Description: |
| * the meta server's date structure, impl file |
| * |
| * Revision history: |
| * 2016-04-25, Weijie Sun(sunweijie at xiaomi.com), first version |
| * xxxx-xx-xx, author, fix bug about xxx |
| */ |
| #include <boost/lexical_cast.hpp> |
| |
| #include <dsn/dist/fmt_logging.h> |
| #include <dsn/service_api_cpp.h> |
| #include <dsn/utility/flags.h> |
| |
| #include "meta_data.h" |
| |
| namespace dsn { |
| namespace replication { |
| |
| // There is an option `max_replicas_in_group` which restricts the max replica count of the whole |
| // cluster. It's a cluster-level option. However, now that it's allowed to update the replication |
| // factor of each table, this cluster-level option should be replaced. |
| // |
| // Conceptually `max_replicas_in_group` is the total number of alive and dropped replicas. Its |
| // default value is 4. For a table that has replication factor 3, that `max_replicas_in_group` |
| // is set to 4 means 3 alive replicas plus a dropped replica. |
| // |
| // `max_replicas_in_group` can also be loaded from configuration file, which means its default |
| // value will be overridden. The value of `max_replicas_in_group` will be assigned to another |
| // static variable `MAX_REPLICA_COUNT_IN_GRROUP`, whose default value is also 4. |
| // |
| // For unit tests, `MAX_REPLICA_COUNT_IN_GRROUP` is set to the default value 4; for production |
| // environments, `MAX_REPLICA_COUNT_IN_GRROUP` is set to 3 since `max_replicas_in_group` is |
| // configured as 3 in `.ini` file. |
| // |
| // Since the cluster-level option `max_replicas_in_group` contains the alive and dropped replicas, |
| // we can use the replication factor of each table as the number of alive replicas, and introduce |
| // another option `max_reserved_dropped_replicas` representing the max reserved number allowed for |
| // dropped replicas. |
| // |
| // If `max_reserved_dropped_replicas` is set to 1, there is at most one dropped replicas reserved, |
| // which means, once the number of alive replicas reaches max_replica_count, at most one dropped |
| // replica can be reserved and others will be eliminated; If `max_reserved_dropped_replicas` is |
| // set to 0, however, none of dropped replicas can be reserved. |
| // |
| // To be consistent with `max_replicas_in_group`, default value of `max_reserved_dropped_replicas` |
| // is set to 1 so that the unit tests can be passed. For production environments, it should be set |
| // to 0. |
| DSN_DEFINE_uint32("meta_server", |
| max_reserved_dropped_replicas, |
| 1, |
| "max reserved number allowed for dropped replicas"); |
| DSN_TAG_VARIABLE(max_reserved_dropped_replicas, FT_MUTABLE); |
| |
| void when_update_replicas(config_type::type t, const std::function<void(bool)> &func) |
| { |
| switch (t) { |
| case config_type::CT_ASSIGN_PRIMARY: |
| case config_type::CT_UPGRADE_TO_PRIMARY: |
| case config_type::CT_UPGRADE_TO_SECONDARY: |
| func(true); |
| break; |
| case config_type::CT_DOWNGRADE_TO_INACTIVE: |
| case config_type::CT_REMOVE: |
| case config_type::CT_DROP_PARTITION: |
| func(false); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| void maintain_drops(std::vector<rpc_address> &drops, const rpc_address &node, config_type::type t) |
| { |
| auto action = [&drops, &node](bool is_adding) { |
| auto it = std::find(drops.begin(), drops.end(), node); |
| if (is_adding) { |
| if (it != drops.end()) { |
| drops.erase(it); |
| } |
| } else { |
| dassert(it == drops.end(), |
| "the node(%s) cannot be in drops set before this update", |
| node.to_string()); |
| drops.push_back(node); |
| if (drops.size() > 3) { |
| drops.erase(drops.begin()); |
| } |
| } |
| }; |
| when_update_replicas(t, action); |
| } |
| |
| bool construct_replica(meta_view view, const gpid &pid, int max_replica_count) |
| { |
| partition_configuration &pc = *get_config(*view.apps, pid); |
| config_context &cc = *get_config_context(*view.apps, pid); |
| |
| dassert(replica_count(pc) == 0, |
| "replica count of gpid(%d.%d) must be 0", |
| pid.get_app_id(), |
| pid.get_partition_index()); |
| dassert( |
| max_replica_count > 0, "max replica count is %d, should be at lease 1", max_replica_count); |
| |
| std::vector<dropped_replica> &drop_list = cc.dropped; |
| if (drop_list.empty()) { |
| dwarn("construct for (%d.%d) failed, coz no replicas collected", |
| pid.get_app_id(), |
| pid.get_partition_index()); |
| return false; |
| } |
| |
| // treat last server in drop_list as the primary |
| const dropped_replica &server = drop_list.back(); |
| dassert(server.ballot != invalid_ballot, |
| "the ballot of server must not be invalid_ballot, node = %s", |
| server.node.to_string()); |
| pc.primary = server.node; |
| pc.ballot = server.ballot; |
| pc.partition_flags = 0; |
| pc.max_replica_count = max_replica_count; |
| |
| ddebug("construct for (%d.%d), select %s as primary, ballot(%" PRId64 |
| "), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")", |
| pid.get_app_id(), |
| pid.get_partition_index(), |
| server.node.to_string(), |
| server.ballot, |
| server.last_committed_decree, |
| server.last_prepared_decree); |
| |
| drop_list.pop_back(); |
| |
| // we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the |
| // only primary dead |
| // when add node to pc.last_drops, we don't remove it from our cc.drop_list |
| dassert(pc.last_drops.empty(), |
| "last_drops of partition(%d.%d) must be empty", |
| pid.get_app_id(), |
| pid.get_partition_index()); |
| for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) { |
| if (pc.last_drops.size() + 1 >= max_replica_count) |
| break; |
| // similar to cc.drop_list, pc.last_drop is also a stack structure |
| pc.last_drops.insert(pc.last_drops.begin(), iter->node); |
| ddebug("construct for (%d.%d), select %s into last_drops, ballot(%" PRId64 |
| "), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")", |
| pid.get_app_id(), |
| pid.get_partition_index(), |
| iter->node.to_string(), |
| iter->ballot, |
| iter->last_committed_decree, |
| iter->last_prepared_decree); |
| } |
| |
| cc.prefered_dropped = (int)drop_list.size() - 1; |
| return true; |
| } |
| |
| bool collect_replica(meta_view view, const rpc_address &node, const replica_info &info) |
| { |
| partition_configuration &pc = *get_config(*view.apps, info.pid); |
| // current partition is during partition split |
| if (pc.ballot == invalid_ballot) |
| return false; |
| config_context &cc = *get_config_context(*view.apps, info.pid); |
| if (is_member(pc, node)) { |
| cc.collect_serving_replica(node, info); |
| return true; |
| } |
| |
| // compare current node's replica information with current proposal, |
| // and try to find abnormal situations in send proposal |
| cc.adjust_proposal(node, info); |
| |
| // adjust the drop list |
| int ans = cc.collect_drop_replica(node, info); |
| dassert(cc.check_order(), ""); |
| |
| return info.status == partition_status::PS_POTENTIAL_SECONDARY || ans != -1; |
| } |
| |
| proposal_actions::proposal_actions() : from_balancer(false) { reset_tracked_current_learner(); } |
| |
| void proposal_actions::reset_tracked_current_learner() |
| { |
| learning_progress_abnormal_detected = false; |
| current_learner.ballot = invalid_ballot; |
| current_learner.last_durable_decree = invalid_decree; |
| current_learner.last_committed_decree = invalid_decree; |
| current_learner.last_prepared_decree = invalid_decree; |
| } |
| |
| void proposal_actions::track_current_learner(const dsn::rpc_address &node, const replica_info &info) |
| { |
| if (empty()) |
| return; |
| configuration_proposal_action &act = acts.front(); |
| if (act.node != node) |
| return; |
| |
| // currently we only handle add secondary |
| // TODO: adjust other proposals according to replica info collected |
| if (act.type == config_type::CT_ADD_SECONDARY || |
| act.type == config_type::CT_ADD_SECONDARY_FOR_LB) { |
| |
| if (info.status == partition_status::PS_ERROR || |
| info.status == partition_status::PS_INACTIVE) { |
| // if we've collected inforamtions for the learner, then it claims it's down |
| // we will treat the learning process failed |
| if (current_learner.ballot != invalid_ballot) { |
| ddebug("%d.%d: a learner's is down to status(%s), perhaps learn failed", |
| info.pid.get_app_id(), |
| info.pid.get_partition_index(), |
| dsn::enum_to_string(info.status)); |
| learning_progress_abnormal_detected = true; |
| } else { |
| dinfo("%d.%d: ignore abnormal status of %s, perhaps learn not start", |
| info.pid.get_app_id(), |
| info.pid.get_partition_index(), |
| node.to_string()); |
| } |
| } else if (info.status == partition_status::PS_POTENTIAL_SECONDARY) { |
| if (current_learner.ballot > info.ballot || |
| current_learner.last_committed_decree > info.last_committed_decree || |
| current_learner.last_prepared_decree > info.last_prepared_decree) { |
| |
| // TODO: need to add a perf counter here |
| dwarn("%d.%d: learner(%s)'s progress step back, please trace this carefully", |
| info.pid.get_app_id(), |
| info.pid.get_partition_index(), |
| node.to_string()); |
| } |
| |
| // NOTICE: the flag may be abormal currently. it's balancer's duty to make use of the |
| // abnormal flag and decide whether to cancel the proposal. |
| // if the balancer try to give the proposal another chance, or another learning round |
| // starts before the balancer notice it, let's just treat it normal again. |
| learning_progress_abnormal_detected = false; |
| current_learner = info; |
| } |
| } |
| } |
| |
| bool proposal_actions::is_abnormal_learning_proposal() const |
| { |
| if (empty()) |
| return false; |
| if (front()->type != config_type::CT_ADD_SECONDARY && |
| front()->type != config_type::CT_ADD_SECONDARY_FOR_LB) |
| return false; |
| return learning_progress_abnormal_detected; |
| } |
| |
| void proposal_actions::clear() |
| { |
| from_balancer = false; |
| acts.clear(); |
| reset_tracked_current_learner(); |
| } |
| |
| void proposal_actions::pop_front() |
| { |
| if (!acts.empty()) { |
| acts.erase(acts.begin()); |
| reset_tracked_current_learner(); |
| } |
| } |
| |
| const configuration_proposal_action *proposal_actions::front() const |
| { |
| if (acts.empty()) |
| return nullptr; |
| return &acts.front(); |
| } |
| |
| void proposal_actions::assign_cure_proposal(const configuration_proposal_action &act) |
| { |
| from_balancer = false; |
| acts = {act}; |
| reset_tracked_current_learner(); |
| } |
| |
| void proposal_actions::assign_balancer_proposals( |
| const std::vector<configuration_proposal_action> &cpa_list) |
| { |
| from_balancer = true; |
| acts = cpa_list; |
| reset_tracked_current_learner(); |
| } |
| |
| bool proposal_actions::empty() const { return acts.empty(); } |
| |
| int config_context::MAX_REPLICA_COUNT_IN_GRROUP = 4; |
| void config_context::cancel_sync() |
| { |
| if (config_status::pending_remote_sync == stage) { |
| pending_sync_task->cancel(false); |
| pending_sync_task = nullptr; |
| pending_sync_request.reset(); |
| } |
| if (msg) { |
| msg->release_ref(); |
| } |
| msg = nullptr; |
| stage = config_status::not_pending; |
| } |
| |
| void config_context::check_size() |
| { |
| // when add learner, it is possible that replica_count > max_replica_count, so we |
| // need to remove things from dropped only when it's not empty. |
| while (replica_count(*config_owner) + dropped.size() > |
| config_owner->max_replica_count + FLAGS_max_reserved_dropped_replicas && |
| !dropped.empty()) { |
| dropped.erase(dropped.begin()); |
| prefered_dropped = (int)dropped.size() - 1; |
| } |
| } |
| |
| std::vector<dropped_replica>::iterator config_context::find_from_dropped(const rpc_address &node) |
| { |
| return std::find_if(dropped.begin(), dropped.end(), [&node](const dropped_replica &r) { |
| return r.node == node; |
| }); |
| } |
| |
| std::vector<dropped_replica>::const_iterator |
| config_context::find_from_dropped(const rpc_address &node) const |
| { |
| return std::find_if(dropped.begin(), dropped.end(), [&node](const dropped_replica &r) { |
| return r.node == node; |
| }); |
| } |
| |
| bool config_context::remove_from_dropped(const rpc_address &node) |
| { |
| auto iter = find_from_dropped(node); |
| if (iter != dropped.end()) { |
| dropped.erase(iter); |
| prefered_dropped = (int)dropped.size() - 1; |
| return true; |
| } |
| return false; |
| } |
| |
| bool config_context::record_drop_history(const rpc_address &node) |
| { |
| auto iter = find_from_dropped(node); |
| if (iter != dropped.end()) |
| return false; |
| dropped.emplace_back( |
| dropped_replica{node, dsn_now_ms(), invalid_ballot, invalid_decree, invalid_decree}); |
| prefered_dropped = (int)dropped.size() - 1; |
| check_size(); |
| return true; |
| } |
| |
| int config_context::collect_drop_replica(const rpc_address &node, const replica_info &info) |
| { |
| bool in_dropped = false; |
| auto iter = find_from_dropped(node); |
| uint64_t last_drop_time = dropped_replica::INVALID_TIMESTAMP; |
| if (iter != dropped.end()) { |
| in_dropped = true; |
| last_drop_time = iter->time; |
| dropped.erase(iter); |
| prefered_dropped = (int)dropped.size() - 1; |
| } |
| |
| dropped_replica current = { |
| node, last_drop_time, info.ballot, info.last_committed_decree, info.last_prepared_decree}; |
| auto cmp = [](const dropped_replica &d1, const dropped_replica &d2) { |
| return dropped_cmp(d1, d2) < 0; |
| }; |
| iter = std::lower_bound(dropped.begin(), dropped.end(), current, cmp); |
| |
| dropped.emplace(iter, current); |
| prefered_dropped = (int)dropped.size() - 1; |
| check_size(); |
| |
| iter = find_from_dropped(node); |
| if (iter == dropped.end()) { |
| dassert(!in_dropped, |
| "adjust position of existing node(%s) failed, this is a bug, partition(%d.%d)", |
| node.to_string(), |
| config_owner->pid.get_app_id(), |
| config_owner->pid.get_partition_index()); |
| return -1; |
| } |
| return in_dropped ? 1 : 0; |
| } |
| |
| bool config_context::check_order() |
| { |
| if (dropped.empty()) |
| return true; |
| for (unsigned int i = 0; i < dropped.size() - 1; ++i) { |
| if (dropped_cmp(dropped[i], dropped[i + 1]) > 0) { |
| derror("check dropped order for gpid(%d.%d) failed, [%s,%llu,%lld,%lld,%lld@%d] vs " |
| "[%s,%llu,%lld,%lld,%lld@%d]", |
| config_owner->pid.get_app_id(), |
| config_owner->pid.get_partition_index(), |
| dropped[i].node.to_string(), |
| dropped[i].time, |
| dropped[i].ballot, |
| dropped[i].last_committed_decree, |
| dropped[i].last_prepared_decree, |
| i, |
| dropped[i].node.to_string(), |
| dropped[i].time, |
| dropped[i].ballot, |
| dropped[i].last_committed_decree, |
| dropped[i].last_prepared_decree, |
| i + 1); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| std::vector<serving_replica>::iterator config_context::find_from_serving(const rpc_address &node) |
| { |
| return std::find_if(serving.begin(), serving.end(), [&node](const serving_replica &r) { |
| return r.node == node; |
| }); |
| } |
| |
| std::vector<serving_replica>::const_iterator |
| config_context::find_from_serving(const rpc_address &node) const |
| { |
| return std::find_if(serving.begin(), serving.end(), [&node](const serving_replica &r) { |
| return r.node == node; |
| }); |
| } |
| |
| bool config_context::remove_from_serving(const rpc_address &node) |
| { |
| auto iter = find_from_serving(node); |
| if (iter != serving.end()) { |
| serving.erase(iter); |
| return true; |
| } |
| return false; |
| } |
| |
| void config_context::collect_serving_replica(const rpc_address &node, const replica_info &info) |
| { |
| auto iter = find_from_serving(node); |
| auto compact_status = info.__isset.manual_compact_status ? info.manual_compact_status |
| : manual_compaction_status::IDLE; |
| if (iter != serving.end()) { |
| iter->disk_tag = info.disk_tag; |
| iter->storage_mb = 0; |
| iter->compact_status = compact_status; |
| } else { |
| serving.emplace_back(serving_replica{node, 0, info.disk_tag, compact_status}); |
| } |
| } |
| |
| void config_context::adjust_proposal(const rpc_address &node, const replica_info &info) |
| { |
| lb_actions.track_current_learner(node, info); |
| } |
| |
| bool config_context::get_disk_tag(const rpc_address &node, /*out*/ std::string &disk_tag) const |
| { |
| auto iter = find_from_serving(node); |
| if (iter == serving.end()) { |
| return false; |
| } |
| disk_tag = iter->disk_tag; |
| return true; |
| } |
| |
| void app_state_helper::on_init_partitions() |
| { |
| config_context context; |
| context.stage = config_status::not_pending; |
| context.pending_sync_task = nullptr; |
| context.msg = nullptr; |
| |
| context.prefered_dropped = -1; |
| contexts.assign(owner->partition_count, context); |
| |
| std::vector<partition_configuration> &partitions = owner->partitions; |
| for (unsigned int i = 0; i != owner->partition_count; ++i) { |
| contexts[i].config_owner = &(partitions[i]); |
| } |
| |
| partitions_in_progress.store(owner->partition_count); |
| restore_states.resize(owner->partition_count); |
| } |
| |
| void app_state_helper::reset_manual_compact_status() |
| { |
| for (auto &cc : contexts) { |
| for (auto &r : cc.serving) { |
| r.compact_status = manual_compaction_status::IDLE; |
| } |
| } |
| } |
| |
| bool app_state_helper::get_manual_compact_progress(/*out*/ int32_t &progress) const |
| { |
| int32_t total_replica_count = owner->partition_count * owner->max_replica_count; |
| dassert_f(total_replica_count > 0, |
| "invalid app metadata, app({}), partition_count({}), max_replica_count({})", |
| owner->app_name, |
| owner->partition_count, |
| owner->max_replica_count); |
| int32_t finish_count = 0, idle_count = 0; |
| for (const auto &cc : contexts) { |
| for (const auto &r : cc.serving) { |
| if (r.compact_status == manual_compaction_status::IDLE) { |
| idle_count++; |
| } else if (r.compact_status == manual_compaction_status::FINISHED) { |
| finish_count++; |
| } |
| } |
| } |
| // all replicas of all partitions are idle |
| if (idle_count == total_replica_count) { |
| progress = 0; |
| return false; |
| } |
| progress = finish_count * 100 / total_replica_count; |
| return true; |
| } |
| |
| app_state::app_state(const app_info &info) : app_info(info), helpers(new app_state_helper()) |
| { |
| log_name = info.app_name + "(" + boost::lexical_cast<std::string>(info.app_id) + ")"; |
| helpers->owner = this; |
| |
| partition_configuration config; |
| config.ballot = 0; |
| config.pid.set_app_id(app_id); |
| config.last_committed_decree = 0; |
| config.last_drops.clear(); |
| config.max_replica_count = app_info::max_replica_count; |
| config.primary.set_invalid(); |
| config.secondaries.clear(); |
| partitions.assign(app_info::partition_count, config); |
| for (int i = 0; i != app_info::partition_count; ++i) |
| partitions[i].pid.set_partition_index(i); |
| |
| helpers->on_init_partitions(); |
| } |
| |
| std::shared_ptr<app_state> app_state::create(const app_info &info) |
| { |
| return std::make_shared<app_state>(info); |
| } |
| |
| node_state::node_state() |
| : total_primaries(0), total_partitions(0), is_alive(false), has_collected_replicas(false) |
| { |
| } |
| |
| const partition_set *node_state::get_partitions(int app_id, bool only_primary) const |
| { |
| const std::map<int32_t, partition_set> *all_partitions; |
| if (only_primary) |
| all_partitions = &app_primaries; |
| else |
| all_partitions = &app_partitions; |
| |
| auto iter = all_partitions->find(app_id); |
| if (iter == all_partitions->end()) |
| return nullptr; |
| else |
| return &(iter->second); |
| } |
| |
| partition_set *node_state::get_partitions(app_id id, bool only_primary, bool create_new) |
| { |
| std::map<int32_t, partition_set> *all_partitions; |
| if (only_primary) |
| all_partitions = &app_primaries; |
| else |
| all_partitions = &app_partitions; |
| |
| if (create_new) { |
| return &((*all_partitions)[id]); |
| } else { |
| auto iter = all_partitions->find(id); |
| if (iter == all_partitions->end()) |
| return nullptr; |
| else |
| return &(iter->second); |
| } |
| } |
| |
| partition_set *node_state::partitions(app_id id, bool only_primary) |
| { |
| return const_cast<partition_set *>(get_partitions(id, only_primary)); |
| } |
| |
| const partition_set *node_state::partitions(app_id id, bool only_primary) const |
| { |
| return get_partitions(id, only_primary); |
| } |
| |
| void node_state::put_partition(const gpid &pid, bool is_primary) |
| { |
| partition_set *all = get_partitions(pid.get_app_id(), false, true); |
| if ((all->insert(pid)).second) |
| total_partitions++; |
| if (is_primary) { |
| partition_set *pri = get_partitions(pid.get_app_id(), true, true); |
| if ((pri->insert(pid)).second) |
| total_primaries++; |
| } |
| } |
| |
| void node_state::remove_partition(const gpid &pid, bool only_primary) |
| { |
| partition_set *pri = get_partitions(pid.get_app_id(), true, true); |
| total_primaries -= pri->erase(pid); |
| if (!only_primary) { |
| partition_set *all = get_partitions(pid.get_app_id(), false, true); |
| total_partitions -= all->erase(pid); |
| } |
| } |
| |
| bool node_state::for_each_primary(app_id id, const std::function<bool(const gpid &)> &f) const |
| { |
| const partition_set *pri = partitions(id, true); |
| if (pri == nullptr) { |
| return true; |
| } |
| for (const gpid &pid : *pri) { |
| dassert(id == pid.get_app_id(), |
| "invalid gpid(%d.%d), app_id must be %d", |
| pid.get_app_id(), |
| pid.get_partition_index(), |
| id); |
| if (!f(pid)) |
| return false; |
| } |
| return true; |
| } |
| |
| bool node_state::for_each_partition(app_id id, const std::function<bool(const gpid &)> &f) const |
| { |
| const partition_set *par = partitions(id, false); |
| if (par == nullptr) { |
| return true; |
| } |
| for (const gpid &pid : *par) { |
| dassert(id == pid.get_app_id(), |
| "invalid gpid(%d.%d), app_id must be %d", |
| pid.get_app_id(), |
| pid.get_partition_index(), |
| id); |
| if (!f(pid)) |
| return false; |
| } |
| return true; |
| } |
| |
| bool node_state::for_each_partition(const std::function<bool(const gpid &)> &f) const |
| { |
| for (const auto &pair : app_partitions) { |
| const partition_set &ps = pair.second; |
| for (const auto &gpid : ps) { |
| if (!f(gpid)) |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| unsigned node_state::primary_count(app_id id) const |
| { |
| const partition_set *pri = partitions(id, true); |
| if (pri == nullptr) |
| return 0; |
| return pri->size(); |
| } |
| |
| unsigned node_state::partition_count(app_id id) const |
| { |
| const partition_set *pri = partitions(id, false); |
| if (pri == nullptr) |
| return 0; |
| return pri->size(); |
| } |
| |
| partition_status::type node_state::served_as(const gpid &pid) const |
| { |
| const partition_set *ps1 = partitions(pid.get_app_id(), true); |
| if (ps1 != nullptr && ps1->find(pid) != ps1->end()) |
| return partition_status::PS_PRIMARY; |
| const partition_set *ps2 = partitions(pid.get_app_id(), false); |
| if (ps2 != nullptr && ps2->find(pid) != ps2->end()) |
| return partition_status::PS_SECONDARY; |
| return partition_status::PS_INACTIVE; |
| } |
| } // namespace replication |
| } // namespace dsn |