blob: 56cc04e46ba105a07a2c4260bcba0bf03eb5bc91 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "meta/meta_server_failure_detector.h"
#include <chrono>
#include <thread>
#include <utility>
#include "absl/strings/string_view.h"
#include "fd_types.h"
#include "meta/meta_options.h"
#include "meta/meta_service.h"
#include "runtime/app_model.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/serverlet.h"
#include "runtime/task/task_code.h"
#include "utils/autoref_ptr.h"
#include "utils/distributed_lock_service.h"
#include "utils/error_code.h"
#include "utils/factory_store.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"
DSN_DEFINE_int32(meta_server,
max_succssive_unstable_restart,
5,
"The maximum number of times a ReplicaServer can be disconnected. If the number "
"of a ReplicaServer disconnects exceeds this threshold, MetaServer will add it to "
"the blacklist to avoid instability caused by frequent reconnection");
DSN_DEFINE_uint64(meta_server,
stable_rs_min_running_seconds,
600,
"The time threshold for determining whether a ReplicaServer is running stably");
DSN_DEFINE_string(meta_server,
distributed_lock_service_type,
#ifdef MOCK_TEST
"distributed_lock_service_simple",
#else
"distributed_lock_service_zookeeper",
#endif
"The implementation class of distributed lock service");
namespace dsn {
namespace replication {
meta_server_failure_detector::meta_server_failure_detector(meta_service *svc)
: _svc(svc),
_lock_svc(nullptr),
_primary_lock_id("dsn.meta.server.leader"),
_is_leader(false),
_election_moment(0)
{
_fd_opts = &(svc->get_meta_options()._fd_opts);
_lock_svc = dsn::utils::factory_store<dist::distributed_lock_service>::create(
FLAGS_distributed_lock_service_type, PROVIDER_TYPE_MAIN);
error_code err = _lock_svc->initialize(_fd_opts->distributed_lock_service_args);
CHECK_EQ_MSG(err, ERR_OK, "init distributed_lock_service failed");
}
meta_server_failure_detector::~meta_server_failure_detector()
{
if (_lock_grant_task)
_lock_grant_task->cancel(true);
if (_lock_expire_task)
_lock_expire_task->cancel(true);
if (_lock_svc) {
_lock_svc->finalize();
delete _lock_svc;
}
}
void meta_server_failure_detector::on_worker_disconnected(const std::vector<host_port> &nodes)
{
_svc->set_node_state(nodes, false);
}
void meta_server_failure_detector::on_worker_connected(const host_port &node)
{
_svc->set_node_state({node}, true);
}
bool meta_server_failure_detector::get_leader(host_port *leader)
{
FAIL_POINT_INJECT_F("meta_server_failure_detector_get_leader", [leader](absl::string_view str) {
/// the format of str is : true#{ip}:{port} or false#{ip}:{port}
auto pos = str.find("#");
// get leader host_port
auto addr_part = str.substr(pos + 1, str.length() - pos - 1);
*leader = host_port::from_string(addr_part.data());
CHECK(*leader, "parse {} to rpc_address failed", addr_part);
// get the return value which implies whether the current node is primary or not
bool is_leader = true;
auto is_leader_part = str.substr(0, pos);
if (!dsn::buf2bool(is_leader_part, is_leader)) {
CHECK(false, "parse {} to bool failed", is_leader_part);
}
return is_leader;
});
dsn::host_port holder;
if (leader == nullptr) {
leader = &holder;
}
if (_is_leader.load()) {
*leader = dsn_primary_host_port();
return true;
}
if (_lock_svc == nullptr) {
leader->reset();
return false;
}
std::string lock_owner;
uint64_t version;
error_code err = _lock_svc->query_cache(_primary_lock_id, lock_owner, version);
if (err != dsn::ERR_OK) {
LOG_WARNING("query leader from cache got error({})", err);
leader->reset();
return false;
}
*leader = host_port::from_string(lock_owner);
if (!(*leader)) {
leader->reset();
return false;
}
return (*leader) == dsn_primary_host_port();
}
DEFINE_TASK_CODE(LPC_META_SERVER_LEADER_LOCK_CALLBACK, TASK_PRIORITY_COMMON, fd::THREAD_POOL_FD)
void meta_server_failure_detector::acquire_leader_lock()
{
//
// try to get the leader lock until it is done
//
dsn::dist::distributed_lock_service::lock_options opt = {true, true};
while (true) {
error_code err;
auto tasks = _lock_svc->lock(
_primary_lock_id,
dsn_primary_host_port().to_string(),
// lock granted
LPC_META_SERVER_LEADER_LOCK_CALLBACK,
[this, &err](error_code ec, const std::string &owner, uint64_t version) {
LOG_INFO("leader lock granted callback: err({}), owner({}), version({})",
ec,
owner,
version);
err = ec;
if (err == dsn::ERR_OK) {
leader_initialize(owner);
}
},
// lease expire
LPC_META_SERVER_LEADER_LOCK_CALLBACK,
[](error_code ec, const std::string &owner, uint64_t version) {
LOG_ERROR("leader lock expired callback: err({}), owner({}), version({})",
ec,
owner,
version);
// let's take the easy way right now
dsn_exit(0);
},
opt);
_lock_grant_task = tasks.first;
_lock_expire_task = tasks.second;
_lock_grant_task->wait();
if (err == ERR_OK) {
break;
} else {
// sleep for 1 second before retry
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
void meta_server_failure_detector::reset_stability_stat(const host_port &node)
{
zauto_lock l(_map_lock);
auto iter = _stablity.find(node);
if (iter == _stablity.end())
return;
else {
LOG_INFO("old stability stat: node({}), start_time({}), unstable_count({}), will reset "
"unstable count to 0",
node,
iter->second.last_start_time_ms,
iter->second.unstable_restart_count);
iter->second.unstable_restart_count = 0;
}
}
void meta_server_failure_detector::leader_initialize(const std::string &lock_service_owner)
{
const auto hp = dsn::host_port::from_string(lock_service_owner);
CHECK(hp, "parse {} to host_port failed", lock_service_owner);
CHECK_EQ_MSG(hp, dsn_primary_host_port(), "acquire leader return success, but owner not match");
_is_leader.store(true);
_election_moment.store(dsn_now_ms());
}
bool meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &beacon)
{
zauto_lock l(_map_lock);
host_port hp_from_node;
GET_HOST_PORT(beacon, from_node, hp_from_node);
auto iter = _stablity.find(hp_from_node);
if (iter == _stablity.end()) {
_stablity.emplace(hp_from_node, worker_stability{beacon.start_time, 0});
return true;
} else {
worker_stability &w = iter->second;
if (beacon.start_time == w.last_start_time_ms) {
LOG_DEBUG("{} isn't restarted, last_start_time({})",
FMT_HOST_PORT_AND_IP(beacon, from_node),
w.last_start_time_ms);
if (dsn_now_ms() - w.last_start_time_ms >= FLAGS_stable_rs_min_running_seconds * 1000 &&
w.unstable_restart_count > 0) {
LOG_INFO("{} has stably run for a while, reset it's unstable count({}) to 0",
FMT_HOST_PORT_AND_IP(beacon, from_node),
w.unstable_restart_count);
w.unstable_restart_count = 0;
}
} else if (beacon.start_time > w.last_start_time_ms) {
LOG_INFO("check {} restarted, last_time({}), this_time({})",
FMT_HOST_PORT_AND_IP(beacon, from_node),
w.last_start_time_ms,
beacon.start_time);
if (beacon.start_time - w.last_start_time_ms <
FLAGS_stable_rs_min_running_seconds * 1000) {
w.unstable_restart_count++;
LOG_WARNING("{} encounter an unstable restart, total_count({})",
FMT_HOST_PORT_AND_IP(beacon, from_node),
w.unstable_restart_count);
} else if (w.unstable_restart_count > 0) {
LOG_INFO("{} restart in {} ms after last restart, may recover ok, reset "
"it's unstable count({}) to 0",
FMT_HOST_PORT_AND_IP(beacon, from_node),
beacon.start_time - w.last_start_time_ms,
w.unstable_restart_count);
w.unstable_restart_count = 0;
}
w.last_start_time_ms = beacon.start_time;
} else {
LOG_WARNING("{}: possible encounter a staled message, ignore it",
FMT_HOST_PORT_AND_IP(beacon, from_node));
}
return w.unstable_restart_count < FLAGS_max_succssive_unstable_restart;
}
}
void meta_server_failure_detector::on_ping(const fd::beacon_msg &beacon,
rpc_replier<fd::beacon_ack> &reply)
{
if (beacon.__isset.start_time && !update_stability_stat(beacon)) {
LOG_WARNING("{} is unstable, don't response to it's beacon",
FMT_HOST_PORT_AND_IP(beacon, from_node));
return;
}
fd::beacon_ack ack;
ack.time = beacon.time;
SET_OBJ_IP_AND_HOST_PORT(ack, this_node, beacon, to_node);
ack.allowed = true;
dsn::host_port leader;
if (!get_leader(&leader)) {
ack.is_master = false;
SET_IP_AND_HOST_PORT_BY_DNS(ack, primary_node, leader);
} else {
ack.is_master = true;
SET_OBJ_IP_AND_HOST_PORT(ack, primary_node, beacon, to_node);
failure_detector::on_ping_internal(beacon, ack);
}
LOG_INFO("on_ping, beacon send time[{}], is_master({}), from_node({}), this_node({}), "
"primary_node({})",
ack.time,
ack.is_master ? "true" : "false",
FMT_HOST_PORT_AND_IP(beacon, from_node),
FMT_HOST_PORT_AND_IP(beacon, to_node),
FMT_HOST_PORT_AND_IP(ack, primary_node));
reply(ack);
}
/*the following functions are only for test*/
meta_server_failure_detector::meta_server_failure_detector(const host_port &leader_host_port,
bool is_myself_leader)
{
LOG_INFO("set {} as leader", leader_host_port);
_lock_svc = nullptr;
_is_leader.store(is_myself_leader);
}
void meta_server_failure_detector::set_leader_for_test(const host_port &leader_host_port,
bool is_myself_leader)
{
LOG_INFO("set {} as leader", leader_host_port);
_is_leader.store(is_myself_leader);
}
meta_server_failure_detector::stability_map *
meta_server_failure_detector::get_stability_map_for_test()
{
return &_stablity;
}
}
}