| /* |
| * The MIT License (MIT) |
| * |
| * Copyright (c) 2015 Microsoft Corporation |
| * |
| * -=- Robust Distributed System Nucleus (rDSN) -=- |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining a copy |
| * of this software and associated documentation files (the "Software"), to deal |
| * in the Software without restriction, including without limitation the rights |
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| * copies of the Software, and to permit persons to whom the Software is |
| * furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be included in |
| * all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| * THE SOFTWARE. |
| */ |
| |
| // IWYU pragma: no_include <ext/alloc_traits.h> |
| #include <stdint.h> |
| #include <stdlib.h> |
| #include <time.h> |
| #include <algorithm> |
| #include <atomic> |
| #include <chrono> |
| #include <functional> |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <thread> |
| #include <tuple> |
| #include <utility> |
| #include <vector> |
| |
| #include "failure_detector/failure_detector.h" |
| #include "failure_detector/failure_detector_multimaster.h" |
| #include "fd_types.h" |
| #include "gtest/gtest.h" |
| #include "meta/meta_options.h" |
| #include "meta/meta_server_failure_detector.h" |
| #include "replica/replica_stub.h" |
| #include "runtime/api_layer1.h" |
| #include "runtime/rpc/dns_resolver.h" |
| #include "runtime/rpc/group_host_port.h" |
| #include "runtime/rpc/network.h" |
| #include "runtime/rpc/rpc_address.h" |
| #include "runtime/rpc/rpc_host_port.h" |
| #include "runtime/rpc/rpc_message.h" |
| #include "runtime/serverlet.h" |
| #include "runtime/service_app.h" |
| #include "runtime/task/async_calls.h" |
| #include "runtime/task/task_code.h" |
| #include "runtime/task/task_spec.h" |
| #include "utils/enum_helper.h" |
| #include "utils/error_code.h" |
| #include "utils/flags.h" |
| #include "utils/fmt_logging.h" |
| #include "utils/strings.h" |
| #include "utils/zlocks.h" |
| |
| DSN_DECLARE_int32(max_succssive_unstable_restart); |
| DSN_DECLARE_uint64(stable_rs_min_running_seconds); |
| |
| using namespace dsn; |
| using namespace dsn::fd; |
| |
| #define MPORT_START 30001 |
| #define WPORT 40001 |
| #define MCOUNT 3 |
| |
| DEFINE_TASK_CODE_RPC(RPC_MASTER_CONFIG, TASK_PRIORITY_COMMON, THREAD_POOL_FD) |
| |
| std::atomic_int started_apps(0); |
| class worker_fd_test : public ::dsn::dist::slave_failure_detector_with_multimaster |
| { |
| private: |
| volatile bool _send_ping_switch; |
| /* this function only triggerd once*/ |
| std::function<void(host_port addr)> _connected_cb; |
| std::function<void(const std::vector<host_port> &)> _disconnected_cb; |
| |
| protected: |
| virtual void send_beacon(::dsn::host_port node, uint64_t time) override |
| { |
| if (_send_ping_switch) |
| failure_detector::send_beacon(node, time); |
| else { |
| LOG_DEBUG("ignore send beacon, to node[{}], time[{}]", node, time); |
| } |
| } |
| |
| virtual void on_master_disconnected(const std::vector<host_port> &nodes) override |
| { |
| if (_disconnected_cb) |
| _disconnected_cb(nodes); |
| } |
| |
| virtual void on_master_connected(host_port node) override |
| { |
| if (_connected_cb) |
| _connected_cb(node); |
| } |
| |
| public: |
| worker_fd_test(replication::replica_stub *stub, std::vector<dsn::host_port> &meta_servers) |
| : slave_failure_detector_with_multimaster(meta_servers, |
| [=]() { stub->on_meta_server_disconnected(); }, |
| [=]() { stub->on_meta_server_connected(); }) |
| { |
| _send_ping_switch = false; |
| } |
| void toggle_send_ping(bool toggle) { _send_ping_switch = toggle; } |
| void when_connected(const std::function<void(host_port addr)> &func) { _connected_cb = func; } |
| void when_disconnected(const std::function<void(const std::vector<host_port> &nodes)> &func) |
| { |
| _disconnected_cb = func; |
| } |
| void clear() |
| { |
| _connected_cb = {}; |
| _disconnected_cb = {}; |
| } |
| }; |
| |
| class master_fd_test : public replication::meta_server_failure_detector |
| { |
| private: |
| std::function<void(host_port addr)> _connected_cb; |
| std::function<void(const std::vector<host_port> &)> _disconnected_cb; |
| volatile bool _response_ping_switch; |
| |
| public: |
| virtual void on_ping(const beacon_msg &beacon, ::dsn::rpc_replier<beacon_ack> &reply) override |
| { |
| if (_response_ping_switch) |
| meta_server_failure_detector::on_ping(beacon, reply); |
| else { |
| LOG_DEBUG("ignore on ping, beacon msg, time[{}], from[{}], to[{}]", |
| beacon.time, |
| beacon.from_node, |
| beacon.to_node); |
| } |
| } |
| |
| virtual void on_worker_disconnected(const std::vector<host_port> &worker_list) override |
| { |
| if (_disconnected_cb) |
| _disconnected_cb(worker_list); |
| } |
| virtual void on_worker_connected(host_port node) override |
| { |
| if (_connected_cb) |
| _connected_cb(node); |
| } |
| master_fd_test() : meta_server_failure_detector(host_port(), false) |
| { |
| _response_ping_switch = true; |
| } |
| void toggle_response_ping(bool toggle) { _response_ping_switch = toggle; } |
| void when_connected(const std::function<void(host_port addr)> &func) { _connected_cb = func; } |
| void when_disconnected(const std::function<void(const std::vector<host_port> &nodes)> &func) |
| { |
| _disconnected_cb = func; |
| } |
| void test_register_worker(host_port node) |
| { |
| zauto_lock l(failure_detector::_lock); |
| register_worker(node); |
| } |
| void clear() |
| { |
| _connected_cb = {}; |
| _disconnected_cb = {}; |
| } |
| }; |
| |
| class test_worker : public service_app, public serverlet<test_worker> |
| { |
| public: |
| test_worker(const service_app_info *info) : service_app(info), serverlet("test_worker") {} |
| |
| error_code start(const std::vector<std::string> &args) override |
| { |
| std::vector<host_port> master_group; |
| for (int i = 0; i < 3; ++i) |
| master_group.push_back(host_port("localhost", MPORT_START + i)); |
| _worker_fd = new worker_fd_test(nullptr, master_group); |
| _worker_fd->start(1, 1, 9, 10); |
| ++started_apps; |
| |
| register_rpc_handler( |
| RPC_MASTER_CONFIG, "RPC_MASTER_CONFIG", &test_worker::on_master_config); |
| return ERR_OK; |
| } |
| |
| error_code stop(bool) override { return ERR_OK; } |
| |
| void on_master_config(const config_master_message &request, bool &response) |
| { |
| LOG_DEBUG("master config, request: {}, type: {}", |
| request.master, |
| request.is_register ? "reg" : "unreg"); |
| |
| host_port hp_master; |
| GET_HOST_PORT(request, master, hp_master); |
| |
| if (request.is_register) |
| _worker_fd->register_master(hp_master); |
| else |
| _worker_fd->unregister_master(hp_master); |
| response = true; |
| } |
| |
| worker_fd_test *fd() { return _worker_fd; } |
| private: |
| worker_fd_test *_worker_fd; |
| }; |
| |
| class test_master : public service_app |
| { |
| public: |
| test_master(const service_app_info *info) : ::dsn::service_app(info) {} |
| |
| error_code start(const std::vector<std::string> &args) override |
| { |
| FLAGS_stable_rs_min_running_seconds = 10; |
| FLAGS_max_succssive_unstable_restart = 10; |
| _master_fd = new master_fd_test(); |
| _master_fd->set_options(&_opts); |
| bool use_allow_list = false; |
| if (args.size() >= 3 && args[1] == "whitelist") { |
| std::vector<std::string> ports; |
| utils::split_args(args[2].c_str(), ports, ','); |
| for (auto &port : ports) { |
| rpc_address addr(network::get_local_ipv4(), std::stoi(port)); |
| const auto hp = ::dsn::host_port::from_address(addr); |
| _master_fd->add_allow_list(hp); |
| } |
| use_allow_list = true; |
| } |
| |
| _master_fd->start(1, 1, 9, 10, use_allow_list); |
| LOG_DEBUG("{}", _master_fd->get_allow_list({})); |
| ++started_apps; |
| |
| return ERR_OK; |
| } |
| |
| error_code stop(bool) override { return ERR_OK; } |
| |
| master_fd_test *fd() { return _master_fd; } |
| private: |
| master_fd_test *_master_fd; |
| replication::fd_suboptions _opts; |
| }; |
| |
| bool spin_wait_condition(const std::function<bool()> &pred, int seconds) |
| { |
| for (int i = 0; i != seconds; ++i) { |
| if (pred()) |
| return true; |
| std::this_thread::sleep_for(std::chrono::seconds(1)); |
| } |
| return pred(); |
| } |
| |
| void fd_test_init() |
| { |
| dsn::service_app::register_factory<test_worker>("worker"); |
| dsn::service_app::register_factory<test_master>("master"); |
| srand(time(0)); |
| } |
| |
| bool get_worker_and_master(test_worker *&worker, std::vector<test_master *> &masters) |
| { |
| bool ans = spin_wait_condition([]() { return started_apps == MCOUNT + 1; }, 30); |
| if (!ans) |
| return false; |
| |
| std::vector<service_app *> apps; |
| service_app::get_all_service_apps(&apps); |
| masters.resize(MCOUNT, nullptr); |
| worker = nullptr; |
| |
| for (int i = 0; i != apps.size(); ++i) { |
| if (apps[i]->info().type == "worker") { |
| if (worker != nullptr) |
| return false; |
| worker = reinterpret_cast<test_worker *>(apps[i]); |
| } else if (apps[i]->info().type == "master") { |
| int index = apps[i]->info().index - 1; |
| if (index >= masters.size() || masters[index] != nullptr) |
| return false; |
| masters[index] = reinterpret_cast<test_master *>(apps[i]); |
| } |
| } |
| |
| for (test_master *m : masters) |
| if (m == nullptr) |
| return false; |
| return true; |
| } |
| |
| void master_group_set_leader(std::vector<test_master *> &master_group, int leader_index) |
| { |
| const auto hp_leader = host_port("localhost", MPORT_START + leader_index); |
| int i = 0; |
| for (test_master *&master : master_group) { |
| master->fd()->set_leader_for_test(hp_leader, leader_index == i); |
| i++; |
| } |
| } |
| |
| void worker_set_leader(test_worker *worker, int leader_contact) |
| { |
| worker->fd()->set_leader_for_test(host_port("localhost", MPORT_START + leader_contact)); |
| |
| config_master_message msg; |
| msg.master = rpc_address::from_host_port("localhost", MPORT_START + leader_contact); |
| msg.is_register = true; |
| msg.__set_hp_master(host_port::from_address(msg.master)); |
| error_code err; |
| bool response; |
| std::tie(err, response) = rpc::call_wait<bool>( |
| rpc_address::from_host_port("localhost", WPORT), dsn::task_code(RPC_MASTER_CONFIG), msg); |
| ASSERT_EQ(err, ERR_OK); |
| } |
| |
| void clear(test_worker *worker, std::vector<test_master *> masters) |
| { |
| const auto &hp_leader = worker->fd()->get_servers().group_host_port()->leader(); |
| const auto &leader = dsn::dns_resolver::instance().resolve_address(hp_leader); |
| |
| config_master_message msg; |
| msg.master = leader; |
| msg.is_register = false; |
| msg.__set_hp_master(hp_leader); |
| error_code err; |
| bool response; |
| std::tie(err, response) = rpc::call_wait<bool>( |
| rpc_address::from_host_port("localhost", WPORT), dsn::task_code(RPC_MASTER_CONFIG), msg); |
| ASSERT_EQ(err, ERR_OK); |
| |
| worker->fd()->toggle_send_ping(false); |
| |
| std::for_each(masters.begin(), masters.end(), [](test_master *mst) { |
| mst->fd()->clear_workers(); |
| mst->fd()->toggle_response_ping(true); |
| }); |
| } |
| |
| void finish(test_worker *worker, test_master *master, int master_index) |
| { |
| LOG_WARNING("start to finish"); |
| std::atomic_int wait_count; |
| wait_count.store(2); |
| worker->fd()->when_disconnected( |
| [&wait_count, master_index](const std::vector<host_port> &addr_list) mutable { |
| ASSERT_EQ(addr_list.size(), 1); |
| ASSERT_EQ(addr_list[0].port(), MPORT_START + master_index); |
| --wait_count; |
| }); |
| |
| master->fd()->when_disconnected([&wait_count](const std::vector<host_port> &addr_list) mutable { |
| ASSERT_EQ(addr_list.size(), 1); |
| ASSERT_EQ(addr_list[0].port(), WPORT); |
| --wait_count; |
| }); |
| |
| // we don't send any ping message now |
| worker->fd()->toggle_send_ping(false); |
| ASSERT_TRUE(spin_wait_condition([&wait_count] { return wait_count == 0; }, 20)); |
| worker->fd()->clear(); |
| master->fd()->clear(); |
| } |
| |
| TEST(fd, dummy_connect_disconnect) |
| { |
| test_worker *worker; |
| std::vector<test_master *> masters; |
| ASSERT_TRUE(get_worker_and_master(worker, masters)); |
| |
| clear(worker, masters); |
| // set master with smallest index as the leader |
| master_group_set_leader(masters, 0); |
| // set the worker contact leader |
| worker_set_leader(worker, 0); |
| |
| test_master *leader = masters[0]; |
| // simply wait for two connected |
| std::atomic_int wait_count; |
| wait_count.store(2); |
| worker->fd()->when_connected([&wait_count](host_port leader) mutable { |
| ASSERT_EQ(leader.port(), MPORT_START); |
| --wait_count; |
| }); |
| leader->fd()->when_connected([&wait_count](host_port worker_addr) mutable { |
| ASSERT_EQ(worker_addr.port(), WPORT); |
| --wait_count; |
| }); |
| |
| worker->fd()->toggle_send_ping(true); |
| ASSERT_TRUE(spin_wait_condition([&wait_count] { return wait_count == 0; }, 20)); |
| |
| finish(worker, leader, 0); |
| } |
| |
| TEST(fd, master_redirect) |
| { |
| test_worker *worker; |
| std::vector<test_master *> masters; |
| ASSERT_TRUE(get_worker_and_master(worker, masters)); |
| |
| int index = masters.size() - 1; |
| |
| clear(worker, masters); |
| /* leader is the last master*/ |
| master_group_set_leader(masters, index); |
| // we contact to 0 |
| worker_set_leader(worker, 0); |
| |
| test_master *leader = masters[index]; |
| std::atomic_int wait_count; |
| wait_count.store(2); |
| /* although we contact to the first master, but in the end we must connect to the right leader |
| */ |
| worker->fd()->when_connected([&wait_count](host_port leader) mutable { --wait_count; }); |
| leader->fd()->when_connected([&wait_count](host_port worker_addr) mutable { |
| ASSERT_EQ(worker_addr.port(), WPORT); |
| --wait_count; |
| }); |
| |
| worker->fd()->toggle_send_ping(true); |
| ASSERT_TRUE(spin_wait_condition([&wait_count] { return wait_count == 0; }, 20)); |
| // in the end, the worker will connect to the right master |
| ASSERT_TRUE(spin_wait_condition( |
| [worker, index] { |
| return worker->fd()->current_server_contact().port() == MPORT_START + index; |
| }, |
| 20)); |
| |
| finish(worker, leader, index); |
| } |
| |
| TEST(fd, switch_new_master_suddenly) |
| { |
| test_worker *worker; |
| std::vector<test_master *> masters; |
| ASSERT_TRUE(get_worker_and_master(worker, masters)); |
| |
| clear(worker, masters); |
| |
| test_master *tst_master; |
| int index = 0; |
| |
| master_group_set_leader(masters, index); |
| // and now we contact to 1 |
| worker_set_leader(worker, 1); |
| |
| tst_master = masters[index]; |
| std::atomic_int wait_count; |
| wait_count.store(2); |
| |
| auto cb = [&wait_count](host_port) mutable { --wait_count; }; |
| worker->fd()->when_connected(cb); |
| tst_master->fd()->when_connected(cb); |
| |
| worker->fd()->toggle_send_ping(true); |
| ASSERT_TRUE(spin_wait_condition([&wait_count]() { return wait_count == 0; }, 20)); |
| ASSERT_EQ(worker->fd()->current_server_contact().port(), MPORT_START + index); |
| |
| worker->fd()->when_connected(nullptr); |
| /* we select a new leader */ |
| index = masters.size() - 1; |
| tst_master = masters[index]; |
| /* |
| * for perfect FD, the new master should assume the worker connected. |
| * But first we test if the worker can connect to the new master. |
| * So clear all the workers |
| */ |
| tst_master->fd()->clear_workers(); |
| wait_count.store(1); |
| tst_master->fd()->when_connected([&wait_count](host_port addr) mutable { |
| ASSERT_EQ(addr.port(), WPORT); |
| --wait_count; |
| }); |
| master_group_set_leader(masters, index); |
| |
| /* now we can worker the worker to connect to the new master */ |
| ASSERT_TRUE(spin_wait_condition([&wait_count]() { return wait_count == 0; }, 20)); |
| /* it may takes time for worker to switch to new master, but 20 seconds |
| * is enough as in our setting, lease_period is 9 seconds. */ |
| ASSERT_TRUE(spin_wait_condition( |
| [worker, index]() { |
| return worker->fd()->current_server_contact().port() == MPORT_START + index; |
| }, |
| 20)); |
| |
| finish(worker, tst_master, index); |
| } |
| |
| TEST(fd, old_master_died) |
| { |
| test_worker *worker; |
| std::vector<test_master *> masters; |
| ASSERT_TRUE(get_worker_and_master(worker, masters)); |
| clear(worker, masters); |
| |
| test_master *tst_master; |
| int index = 0; |
| master_group_set_leader(masters, index); |
| // and now we contact to 0 |
| worker_set_leader(worker, 0); |
| |
| tst_master = masters[index]; |
| std::atomic_int wait_count; |
| wait_count.store(2); |
| |
| auto cb = [&wait_count](host_port) mutable { --wait_count; }; |
| worker->fd()->when_connected(cb); |
| tst_master->fd()->when_connected(cb); |
| |
| worker->fd()->toggle_send_ping(true); |
| ASSERT_TRUE(spin_wait_condition([&wait_count]() -> bool { return wait_count == 0; }, 20)); |
| ASSERT_EQ(worker->fd()->current_server_contact().port(), MPORT_START + index); |
| |
| worker->fd()->when_connected(nullptr); |
| tst_master->fd()->when_connected(nullptr); |
| |
| worker->fd()->when_disconnected([](const std::vector<host_port> &masters_list) { |
| ASSERT_EQ(masters_list.size(), 1); |
| LOG_DEBUG("disconnect from master: {}", masters_list[0]); |
| }); |
| |
| /*first let's stop the old master*/ |
| tst_master->fd()->toggle_response_ping(false); |
| /* then select a new one */ |
| index = masters.size() - 1; |
| tst_master = masters[index]; |
| |
| /* only for test */ |
| tst_master->fd()->clear_workers(); |
| wait_count.store(1); |
| |
| tst_master->fd()->when_connected([&wait_count](host_port addr) mutable { |
| EXPECT_EQ(addr.port(), WPORT); |
| --wait_count; |
| }); |
| master_group_set_leader(masters, index); |
| |
| /* now we can wait the worker to connect to the new master */ |
| ASSERT_TRUE(spin_wait_condition([&wait_count]() { return wait_count == 0; }, 20)); |
| /* it may takes time for worker to switch to new master, but 20 seconds |
| * is enough as in our setting, lease_period is 9 seconds. */ |
| ASSERT_TRUE(spin_wait_condition( |
| [worker, index]() { |
| return worker->fd()->current_server_contact().port() == MPORT_START + index; |
| }, |
| 20)); |
| |
| finish(worker, tst_master, index); |
| } |
| |
| TEST(fd, worker_died_when_switch_master) |
| { |
| test_worker *worker; |
| std::vector<test_master *> masters; |
| ASSERT_TRUE(get_worker_and_master(worker, masters)); |
| clear(worker, masters); |
| |
| test_master *tst_master; |
| int index = 0; |
| master_group_set_leader(masters, index); |
| // and now we contact to 0 |
| worker_set_leader(worker, 0); |
| |
| tst_master = masters[index]; |
| std::atomic_int wait_count; |
| wait_count.store(2); |
| |
| auto cb = [&wait_count](host_port) mutable { --wait_count; }; |
| worker->fd()->when_connected(cb); |
| tst_master->fd()->when_connected(cb); |
| |
| worker->fd()->toggle_send_ping(true); |
| ASSERT_TRUE(spin_wait_condition([&wait_count]() { return wait_count == 0; }, 20)); |
| ASSERT_EQ(worker->fd()->current_server_contact().port(), MPORT_START + index); |
| |
| worker->fd()->when_connected(nullptr); |
| tst_master->fd()->when_connected(nullptr); |
| |
| /*first stop the old leader*/ |
| tst_master->fd()->toggle_response_ping(false); |
| |
| /*then select another leader*/ |
| index = masters.size() - 1; |
| tst_master = masters[index]; |
| |
| wait_count.store(2); |
| tst_master->fd()->when_disconnected( |
| [&wait_count](const std::vector<host_port> &worker_list) mutable { |
| ASSERT_EQ(worker_list.size(), 1); |
| ASSERT_EQ(worker_list[0].port(), WPORT); |
| wait_count--; |
| }); |
| worker->fd()->when_disconnected( |
| [&wait_count](const std::vector<host_port> &master_list) mutable { |
| ASSERT_EQ(master_list.size(), 1); |
| wait_count--; |
| }); |
| |
| /* we assume the worker is alive */ |
| tst_master->fd()->test_register_worker(host_port("localhost", WPORT)); |
| master_group_set_leader(masters, index); |
| |
| /* then stop the worker*/ |
| worker->fd()->toggle_send_ping(false); |
| ASSERT_TRUE(spin_wait_condition([&wait_count] { return wait_count == 0; }, 20)); |
| } |
| |
| dsn::message_ex *create_fake_rpc_response() |
| { |
| dsn::message_ex *req = |
| dsn::message_ex::create_received_request(RPC_MASTER_CONFIG, DSF_THRIFT_BINARY, nullptr, 0); |
| dsn::message_ex *response = req->create_response(); |
| req->add_ref(); |
| req->release_ref(); |
| return response; |
| } |
| |
| TEST(fd, update_stability) |
| { |
| test_worker *worker; |
| std::vector<test_master *> masters; |
| ASSERT_TRUE(get_worker_and_master(worker, masters)); |
| clear(worker, masters); |
| |
| master_group_set_leader(masters, 0); |
| master_fd_test *fd = masters[0]->fd(); |
| fd->toggle_response_ping(true); |
| |
| replication::fd_suboptions opts; |
| FLAGS_stable_rs_min_running_seconds = 5; |
| FLAGS_max_succssive_unstable_restart = 2; |
| fd->set_options(&opts); |
| |
| replication::meta_server_failure_detector::stability_map *smap = |
| fd->get_stability_map_for_test(); |
| smap->clear(); |
| |
| dsn::rpc_replier<beacon_ack> r(create_fake_rpc_response()); |
| beacon_msg msg; |
| msg.from_node = rpc_address::from_host_port("localhost", 123); |
| msg.__set_hp_from_node(host_port("localhost", 123)); |
| msg.to_node = rpc_address::from_host_port("localhost", MPORT_START); |
| msg.__set_hp_to_node(host_port("localhost", MPORT_START)); |
| msg.time = dsn_now_ms(); |
| msg.__isset.start_time = true; |
| msg.start_time = 1000; |
| |
| // first on ping |
| fd->on_ping(msg, r); |
| ASSERT_EQ(1, smap->size()); |
| ASSERT_NE(smap->end(), smap->find(msg.hp_from_node)); |
| |
| replication::meta_server_failure_detector::worker_stability &ws = |
| smap->find(msg.hp_from_node)->second; |
| ASSERT_EQ(0, ws.unstable_restart_count); |
| ASSERT_EQ(msg.start_time, ws.last_start_time_ms); |
| ASSERT_TRUE(r.is_empty()); |
| r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response()); |
| |
| // unstably restart and resend ping |
| msg.start_time += 4000; |
| fd->on_ping(msg, r); |
| ASSERT_EQ(1, ws.unstable_restart_count); |
| ASSERT_EQ(msg.start_time, ws.last_start_time_ms); |
| ASSERT_TRUE(r.is_empty()); |
| r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response()); |
| |
| // upstably restart and resend ping again, the node's ping will be ignored |
| msg.start_time += 4000; |
| fd->on_ping(msg, r); |
| ASSERT_EQ(msg.start_time, ws.last_start_time_ms); |
| ASSERT_EQ(2, ws.unstable_restart_count); |
| ASSERT_FALSE(r.is_empty()); |
| |
| // stably restart, the meta stability & unstable_count will be reset |
| msg.start_time += 10000; |
| fd->on_ping(msg, r); |
| ASSERT_EQ(msg.start_time, ws.last_start_time_ms); |
| ASSERT_EQ(0, ws.unstable_restart_count); |
| ASSERT_TRUE(r.is_empty()); |
| r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response()); |
| |
| // unstably restart, unstable-count++ |
| msg.start_time += 4000; |
| fd->on_ping(msg, r); |
| ASSERT_EQ(msg.start_time, ws.last_start_time_ms); |
| ASSERT_EQ(1, ws.unstable_restart_count); |
| ASSERT_TRUE(r.is_empty()); |
| r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response()); |
| |
| // not restart, unstable-count will be reset |
| fd->on_ping(msg, r); |
| ASSERT_EQ(msg.start_time, ws.last_start_time_ms); |
| ASSERT_EQ(0, ws.unstable_restart_count); |
| ASSERT_TRUE(r.is_empty()); |
| r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response()); |
| |
| // old message, will be ignored |
| msg.start_time -= 4000; |
| fd->on_ping(msg, r); |
| ASSERT_EQ(msg.start_time + 4000, ws.last_start_time_ms); |
| ASSERT_EQ(0, ws.unstable_restart_count); |
| ASSERT_TRUE(r.is_empty()); |
| r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response()); |
| |
| // unstable restart, unstable-count++ |
| msg.start_time += 8000; |
| fd->on_ping(msg, r); |
| ASSERT_EQ(msg.start_time, ws.last_start_time_ms); |
| ASSERT_EQ(1, ws.unstable_restart_count); |
| ASSERT_TRUE(r.is_empty()); |
| r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response()); |
| |
| // unstable restart, unstable-count++, node's ping will be ignored |
| msg.start_time += 4000; |
| fd->on_ping(msg, r); |
| ASSERT_EQ(msg.start_time, ws.last_start_time_ms); |
| ASSERT_EQ(2, ws.unstable_restart_count); |
| ASSERT_FALSE(r.is_empty()); |
| |
| // reset stat |
| fd->reset_stability_stat(msg.hp_from_node); |
| ASSERT_EQ(msg.start_time, ws.last_start_time_ms); |
| ASSERT_EQ(0, ws.unstable_restart_count); |
| } |
| |
| TEST(fd, not_in_whitelist) |
| { |
| test_worker *worker; |
| std::vector<test_master *> masters; |
| ASSERT_TRUE(get_worker_and_master(worker, masters)); |
| |
| clear(worker, masters); |
| // set master with smallest index as the leader |
| master_group_set_leader(masters, 0); |
| // set the worker contact leader |
| worker_set_leader(worker, 0); |
| |
| std::atomic_int wait_count; |
| wait_count.store(1); |
| auto cb = [&wait_count](host_port) mutable { --wait_count; }; |
| worker->fd()->when_connected(cb); |
| worker->fd()->toggle_send_ping(true); |
| |
| ASSERT_TRUE(spin_wait_condition([&wait_count] { return wait_count == 1; }, 20)); |
| } |