blob: 06ac12c9a748cafabca2bcd299d68a101ebaa718 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#pragma once
#include <stdint.h>
#include <algorithm>
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "common/duplication_common.h"
#include "common/gpid.h"
#include "common/json_helper.h"
#include "common/replication_other_types.h"
#include "dsn.layer2_types.h"
#include "meta/duplication/duplication_info.h"
#include "meta_admin_types.h"
#include "metadata_types.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/task/task.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/enum_helper.h"
#include "utils/error_code.h"
#include "utils/extensible_object.h"
#include "utils/fmt_logging.h"
#include "utils/utils.h"
namespace dsn {
class message_ex;
namespace replication {
enum class config_status
{
not_pending,
pending_proposal, // deprecated since pegasus v1.8 or older version
pending_remote_sync,
invalid_status
};
ENUM_BEGIN(config_status, config_status::invalid_status)
ENUM_REG(config_status::not_pending)
ENUM_REG(config_status::pending_proposal)
ENUM_REG(config_status::pending_remote_sync)
ENUM_END(config_status)
enum class pc_status
{
healthy,
ill,
dead,
invalid
};
ENUM_BEGIN(pc_status, pc_status::invalid)
ENUM_REG(pc_status::healthy)
ENUM_REG(pc_status::ill)
ENUM_REG(pc_status::dead)
ENUM_END(pc_status)
class pc_flags
{
public:
static const int dropped = 1;
};
class proposal_actions
{
private:
bool from_balancer;
// used for track the learing process and check if abnormal situation happens
bool learning_progress_abnormal_detected;
replica_info current_learner;
// NOTICE:
// meta servic use configuration_proposal_action::period_ts
// to store a expire timestamp, but a rpc_sender use this field
// to suggest a ttl period
std::vector<configuration_proposal_action> acts;
public:
proposal_actions();
void reset_tracked_current_learner();
void track_current_learner(const host_port &node, const replica_info &info);
void clear();
// return the action in acts & whether the action is from balancer
bool is_from_balancer() const { return from_balancer; }
bool is_abnormal_learning_proposal() const;
void pop_front();
void assign_cure_proposal(const configuration_proposal_action &act);
void assign_balancer_proposals(const std::vector<configuration_proposal_action> &cpa_list);
const configuration_proposal_action *front() const;
bool empty() const;
};
//
// structure "dropped_replica" represents a replica which was downgraded to inactive.
// there are 2 sources to get the dropped replica:
// 1. by record the meta's update-cfg action
// 2. by collect the inactive replicas reported from the replica servers
// generally, we give a partitial order for the dropped_replica, in which a higher order
// roughly means that the replica has MORE data.
//
// a load balancer may record a list of dropped_replica to track the drop history and use
// it to do the cure decision.
//
// currently dropped_cmp depend on the dropped_replica::INVALID_TIMESTAMP is 0,
// if you modify the dropped_replica::INVALID_TIMESTAMP, please modify the dropped_cmp accordingly.
struct dropped_replica
{
dsn::host_port node;
// if a drop-replica is generated by the update-cfg-req, then we can
// record the drop time (milliseconds)
uint64_t time;
// if a drop-replica is got from the replica server's report, then we can
// record (ballot, commit_decree, prepare_decree)
//[
int64_t ballot;
int64_t last_committed_decree;
int64_t last_prepared_decree;
//]
static const uint64_t INVALID_TIMESTAMP = 0;
};
// the order of dropped_replica
// ret:
// 0 => equal
// negtive => d1 smaller than d2
// positive => d1 larger than d2
inline int dropped_cmp(const dropped_replica &d1, const dropped_replica &d2)
{
if (d1.time != d2.time) {
return (d1.time < d2.time) ? -1 : 1;
}
if (d1.ballot != d2.ballot) {
return d1.ballot < d2.ballot ? -1 : 1;
}
if (d1.last_committed_decree != d2.last_committed_decree) {
return d1.last_committed_decree < d2.last_committed_decree ? -1 : 1;
}
if (d1.last_prepared_decree != d2.last_prepared_decree) {
return d1.last_prepared_decree < d2.last_prepared_decree ? -1 : 1;
}
return 0;
}
// Represent a replica that is serving. Info in this structure can only from config-sync of RS.
// Load balancer may use this to do balance decisions.
struct serving_replica
{
dsn::host_port node;
// TODO: report the storage size of replica
int64_t storage_mb;
std::string disk_tag;
manual_compaction_status::type compact_status;
};
class config_context
{
public:
partition_configuration *config_owner;
config_status stage;
// for server state's update config management
//[
task_ptr pending_sync_task;
std::shared_ptr<configuration_update_request> pending_sync_request;
dsn::message_ex *msg;
//]
// for load balancer's decision
//[
proposal_actions lb_actions;
std::vector<serving_replica> serving;
std::vector<dropped_replica> dropped;
// An index value to the vector "dropped".
// Used in load-balancer's cure to avoid select the same learner as
// previous unsuccessful proposal.
// Please refer to partition_guardian::on_missing_secondary.
//
// This should always be less than the dropped.size()
//
// TODO: a more clear implementation
int32_t prefered_dropped;
//]
public:
void check_size();
void cancel_sync();
std::vector<dropped_replica>::iterator find_from_dropped(const dsn::host_port &node);
std::vector<dropped_replica>::const_iterator find_from_dropped(const host_port &node) const;
// return true if remove ok, false if node doesn't in dropped
bool remove_from_dropped(const dsn::host_port &node);
// put recently downgraded node to dropped
// return true if put ok, false if the node has been in dropped
bool record_drop_history(const dsn::host_port &node);
// Notice: please make sure whether node is actually an inactive or a serving replica
// ret:
// 1 => node has been in the dropped
// 0 => insert the info to the dropped
// -1 => info is too staled to insert
int collect_drop_replica(const dsn::host_port &node, const replica_info &info);
// check if dropped vector satisfied the order
bool check_order();
std::vector<serving_replica>::iterator find_from_serving(const dsn::host_port &node);
std::vector<serving_replica>::const_iterator find_from_serving(const host_port &node) const;
// return true if remove ok, false if node doesn't in serving
bool remove_from_serving(const dsn::host_port &node);
void collect_serving_replica(const dsn::host_port &node, const replica_info &info);
void adjust_proposal(const dsn::host_port &node, const replica_info &info);
bool get_disk_tag(const host_port &node, /*out*/ std::string &disk_tag) const;
public:
// intialize to 4 statically.
// and will be set by load-balancer module
static int MAX_REPLICA_COUNT_IN_GRROUP;
};
struct partition_configuration_stateless
{
partition_configuration &config;
partition_configuration_stateless(partition_configuration &pc) : config(pc) {}
std::vector<dsn::host_port> &workers() { return config.hp_last_drops; }
std::vector<dsn::host_port> &hosts() { return config.hp_secondaries; }
bool is_host(const host_port &node) const
{
return utils::contains(config.hp_secondaries, node);
}
bool is_worker(const host_port &node) const
{
return utils::contains(config.hp_last_drops, node);
}
bool is_member(const host_port &node) const { return is_host(node) || is_worker(node); }
};
struct restore_state
{
// restore_status:
// ERR_OK: restore haven't encounter some error
// ERR_CORRUPTION : data on backup media is damaged and we can not skip the damage data,
// so should restore rollback
// ERR_IGNORE_DAMAGED_DATA : data on backup media is damaged but we can skip the damage
// data, so skip the damaged partition
dsn::error_code restore_status;
int32_t progress;
std::string reason;
restore_state() : restore_status(dsn::ERR_OK), progress(0), reason() {}
};
// app partition_split states
// when starting partition split, `splitting_count` will be equal to old_partition_count,
// <parent_partition_index, SPLITTING> will be inserted into `status`.
// if partition[0] finish split, `splitting_count` will decrease and <0, SPLITTING> will be removed
// in `status`.
struct split_state
{
int32_t splitting_count;
// partition_index -> split_status
std::map<int32_t, split_status::type> status;
split_state() : splitting_count(0) {}
};
class app_state;
class app_state_helper
{
public:
app_state *owner;
std::atomic_int partitions_in_progress;
std::vector<config_context> contexts;
dsn::message_ex *pending_response;
std::vector<restore_state> restore_states;
split_state split_states;
public:
app_state_helper() : owner(nullptr), partitions_in_progress(0)
{
contexts.clear();
pending_response = nullptr;
}
void on_init_partitions();
void clear_proposals()
{
for (config_context &cc : contexts) {
cc.lb_actions.clear();
}
}
void reset_manual_compact_status();
// get replica group manual compact progress
// return false if partition is not executing manual compaction
bool get_manual_compact_progress(/*out*/ int32_t &progress) const;
};
/*
* NOTICE: several keys in envs are reserved for recover from cold_backup:
* envs["block_service_provider"] = <block_service_provider>
* envs["cluster_name"] = <cluster_name>
* envs["policy_name"] = <policy_name>
* envs["app_name"] = <app_name>
* envs["app_id"] = <app_id>
* envs["backup_id"] = <backup_id>
* envs["skip_bad_partition"] = <"true" or "false">
*
* after a newly assigned primary get these envs from app_info, it will try to
* init a replica with data stored on the block_device
*/
class app_state : public app_info
{
protected:
std::string log_name;
public:
app_state(const app_info &info);
public:
const char *get_logname() const { return log_name.c_str(); }
std::shared_ptr<app_state_helper> helpers;
std::vector<partition_configuration> partitions;
std::map<dupid_t, duplication_info_s_ptr> duplications;
static std::shared_ptr<app_state> create(const app_info &info);
dsn::blob to_json(app_status::type temp_status)
{
app_info another = *this;
another.status = temp_status;
// persistent envs to zookeeper
dsn::blob result = dsn::json::json_forwarder<app_info>::encode(another);
return result;
}
bool splitting() const { return helpers->split_states.splitting_count > 0; }
};
typedef std::set<dsn::gpid> partition_set;
typedef std::map<app_id, std::shared_ptr<app_state>> app_mapper;
class node_state : public extensible_object<node_state, 4>
{
private:
// partitions
std::map<int32_t, partition_set> app_primaries;
std::map<int32_t, partition_set> app_partitions;
unsigned total_primaries;
unsigned total_partitions;
// status
bool is_alive;
bool has_collected_replicas;
dsn::host_port hp;
const partition_set *get_partitions(app_id id, bool only_primary) const;
partition_set *get_partitions(app_id id, bool only_primary, bool create_new);
public:
node_state();
const partition_set *partitions(app_id id, bool only_primary) const;
partition_set *partitions(app_id id, bool only_primary);
unsigned primary_count(app_id id) const;
unsigned secondary_count(app_id id) const { return partition_count(id) - primary_count(id); }
unsigned partition_count(app_id id) const;
unsigned primary_count() const { return total_primaries; }
unsigned secondary_count() const { return total_partitions - total_primaries; }
unsigned partition_count() const { return total_partitions; }
partition_status::type served_as(const gpid &pid) const;
bool alive() const { return is_alive; }
void set_alive(bool alive) { is_alive = alive; }
bool has_collected() { return has_collected_replicas; }
void set_replicas_collect_flag(bool has_collected) { has_collected_replicas = has_collected; }
const dsn::host_port &host_port() const { return hp; }
void set_hp(const dsn::host_port &val) { hp = val; }
void put_partition(const dsn::gpid &pid, bool is_primary);
void remove_partition(const dsn::gpid &pid, bool only_primary);
bool for_each_partition(const std::function<bool(const dsn::gpid &pid)> &f) const;
bool for_each_partition(app_id id, const std::function<bool(const dsn::gpid &)> &f) const;
bool for_each_primary(app_id id, const std::function<bool(const dsn::gpid &pid)> &f) const;
};
typedef std::unordered_map<host_port, node_state> node_mapper;
typedef std::map<dsn::gpid, std::shared_ptr<configuration_balancer_request>> migration_list;
struct meta_view
{
app_mapper *apps;
node_mapper *nodes;
};
inline node_state *get_node_state(node_mapper &nodes, const host_port &hp, bool create_new)
{
node_state *ns;
if (nodes.find(hp) == nodes.end()) {
if (!create_new)
return nullptr;
ns = &nodes[hp];
ns->set_hp(hp);
}
ns = &nodes[hp];
return ns;
}
inline bool is_node_alive(const node_mapper &nodes, const host_port &hp)
{
auto iter = nodes.find(hp);
if (iter == nodes.end())
return false;
return iter->second.alive();
}
inline const partition_configuration *get_config(const app_mapper &apps, const dsn::gpid &gpid)
{
auto iter = apps.find(gpid.get_app_id());
if (iter == apps.end() || iter->second->status == app_status::AS_DROPPED)
return nullptr;
return &(iter->second->partitions[gpid.get_partition_index()]);
}
inline partition_configuration *get_config(app_mapper &apps, const dsn::gpid &gpid)
{
auto iter = apps.find(gpid.get_app_id());
if (iter == apps.end() || iter->second->status == app_status::AS_DROPPED)
return nullptr;
return &(iter->second->partitions[gpid.get_partition_index()]);
}
inline const config_context *get_config_context(const app_mapper &apps, const dsn::gpid &gpid)
{
auto iter = apps.find(gpid.get_app_id());
if (iter == apps.end() || iter->second->status == app_status::AS_DROPPED)
return nullptr;
return &(iter->second->helpers->contexts[gpid.get_partition_index()]);
}
inline config_context *get_config_context(app_mapper &apps, const dsn::gpid &gpid)
{
auto iter = apps.find(gpid.get_app_id());
if (iter == apps.end() || iter->second->status == app_status::AS_DROPPED)
return nullptr;
return &(iter->second->helpers->contexts[gpid.get_partition_index()]);
}
inline int replica_count(const partition_configuration &pc)
{
int ans = pc.hp_primary ? 1 : 0;
return ans + pc.hp_secondaries.size();
}
enum health_status
{
HS_DEAD = 0, // (primary = 0 && secondary = 0)
HS_UNREADABLE, // (primary = 0 && secondary > 0)
HS_UNWRITABLE, // (primary = 1 && primary + secondary < mutation_2pc_min_replica_count)
HS_WRITABLE_ILL, // (primary = 1 && primary + secondary >= mutation_2pc_min_replica_count
// && primary + secondary < max_replica_count)
HS_HEALTHY, // (primary = 1 && primary + secondary >= max_replica_count)
HS_MAX_VALUE
};
inline health_status partition_health_status(const partition_configuration &pc,
int mutation_2pc_min_replica_count)
{
if (!pc.hp_primary) {
if (pc.hp_secondaries.empty())
return HS_DEAD;
else
return HS_UNREADABLE;
} else {
int n = pc.hp_secondaries.size() + 1;
if (n < mutation_2pc_min_replica_count)
return HS_UNWRITABLE;
else if (n < pc.max_replica_count)
return HS_WRITABLE_ILL;
else
return HS_HEALTHY;
}
}
inline void
for_each_available_app(const app_mapper &apps,
const std::function<bool(const std::shared_ptr<app_state> &)> &action)
{
for (const auto &p : apps) {
if (p.second->status == app_status::AS_AVAILABLE) {
if (!action(p.second))
break;
}
}
}
inline int count_partitions(const app_mapper &apps)
{
int result = 0;
for (auto iter : apps)
if (iter.second->status == app_status::AS_AVAILABLE)
result += iter.second->partition_count;
return result;
}
void when_update_replicas(config_type::type t, const std::function<void(bool)> &func);
template <typename T>
void maintain_drops(/*inout*/ std::vector<T> &drops, const T &node, config_type::type t)
{
auto action = [&drops, &node](bool is_adding) {
auto it = std::find(drops.begin(), drops.end(), node);
if (is_adding) {
if (it != drops.end()) {
drops.erase(it);
}
} else {
CHECK(
it == drops.end(), "the node({}) cannot be in drops set before this update", node);
drops.push_back(node);
if (drops.size() > 3) {
drops.erase(drops.begin());
}
}
};
when_update_replicas(t, action);
}
// Try to construct a replica-group by current replica-infos of a gpid
// ret:
// if construct the replica successfully, return true.
// Notice: as long as we can construct something from current infos, we treat it as a
// success
bool construct_replica(meta_view view, const gpid &pid, int max_replica_count);
// When replica infos are collected from replica servers, meta-server
// will use this to check if a replica on a server is useful
// params:
// node: the owner of the replica info
// info: the replica info on node
// ret:
// return true if the replica is accepted as an useful replica. Or-else false.
// WARNING: if false is returned, the replica on node may be garbage-collected
bool collect_replica(meta_view view, const host_port &node, const replica_info &info);
inline bool has_seconds_expired(uint64_t second_ts) { return second_ts * 1000 < dsn_now_ms(); }
inline bool has_milliseconds_expired(uint64_t milliseconds_ts)
{
return milliseconds_ts < dsn_now_ms();
}
} // namespace replication
} // namespace dsn
namespace dsn {
namespace json {
inline void json_encode(dsn::json::JsonWriter &out, const replication::app_state &state)
{
json_forwarder<dsn::app_info>::encode(out, (const dsn::app_info &)state);
}
inline bool json_decode(const dsn::json::JsonObject &in, replication::app_state &state)
{
return json_forwarder<dsn::app_info>::decode(in, (dsn::app_info &)state);
}
} // namespace json
} // namespace dsn