blob: 49c223e89b7dd762b49ac2258a77b2e3babbbd14 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <stdint.h>
#include <stdlib.h>
#include <time.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>
#include "failure_detector/failure_detector.h"
#include "failure_detector/failure_detector_multimaster.h"
#include "fd_types.h"
#include "gtest/gtest.h"
#include "meta/meta_options.h"
#include "meta/meta_server_failure_detector.h"
#include "replica/replica_stub.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/network.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/serverlet.h"
#include "runtime/service_app.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_spec.h"
#include "utils/enum_helper.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/strings.h"
#include "utils/zlocks.h"
DSN_DECLARE_int32(max_succssive_unstable_restart);
DSN_DECLARE_uint64(stable_rs_min_running_seconds);
using namespace dsn;
using namespace dsn::fd;
#define MPORT_START 30001
#define WPORT 40001
#define MCOUNT 3
DEFINE_TASK_CODE_RPC(RPC_MASTER_CONFIG, TASK_PRIORITY_COMMON, THREAD_POOL_FD)
std::atomic_int started_apps(0);
class worker_fd_test : public ::dsn::dist::slave_failure_detector_with_multimaster
{
private:
volatile bool _send_ping_switch;
/* this function only triggerd once*/
std::function<void(host_port addr)> _connected_cb;
std::function<void(const std::vector<host_port> &)> _disconnected_cb;
protected:
virtual void send_beacon(::dsn::host_port node, uint64_t time) override
{
if (_send_ping_switch)
failure_detector::send_beacon(node, time);
else {
LOG_DEBUG("ignore send beacon, to node[{}], time[{}]", node, time);
}
}
virtual void on_master_disconnected(const std::vector<host_port> &nodes) override
{
if (_disconnected_cb)
_disconnected_cb(nodes);
}
virtual void on_master_connected(host_port node) override
{
if (_connected_cb)
_connected_cb(node);
}
public:
worker_fd_test(replication::replica_stub *stub, std::vector<dsn::host_port> &meta_servers)
: slave_failure_detector_with_multimaster(meta_servers,
[=]() { stub->on_meta_server_disconnected(); },
[=]() { stub->on_meta_server_connected(); })
{
_send_ping_switch = false;
}
void toggle_send_ping(bool toggle) { _send_ping_switch = toggle; }
void when_connected(const std::function<void(host_port addr)> &func) { _connected_cb = func; }
void when_disconnected(const std::function<void(const std::vector<host_port> &nodes)> &func)
{
_disconnected_cb = func;
}
void clear()
{
_connected_cb = {};
_disconnected_cb = {};
}
};
class master_fd_test : public replication::meta_server_failure_detector
{
private:
std::function<void(host_port addr)> _connected_cb;
std::function<void(const std::vector<host_port> &)> _disconnected_cb;
volatile bool _response_ping_switch;
public:
virtual void on_ping(const beacon_msg &beacon, ::dsn::rpc_replier<beacon_ack> &reply) override
{
if (_response_ping_switch)
meta_server_failure_detector::on_ping(beacon, reply);
else {
LOG_DEBUG("ignore on ping, beacon msg, time[{}], from[{}], to[{}]",
beacon.time,
beacon.from_node,
beacon.to_node);
}
}
virtual void on_worker_disconnected(const std::vector<host_port> &worker_list) override
{
if (_disconnected_cb)
_disconnected_cb(worker_list);
}
virtual void on_worker_connected(host_port node) override
{
if (_connected_cb)
_connected_cb(node);
}
master_fd_test() : meta_server_failure_detector(host_port(), false)
{
_response_ping_switch = true;
}
void toggle_response_ping(bool toggle) { _response_ping_switch = toggle; }
void when_connected(const std::function<void(host_port addr)> &func) { _connected_cb = func; }
void when_disconnected(const std::function<void(const std::vector<host_port> &nodes)> &func)
{
_disconnected_cb = func;
}
void test_register_worker(host_port node)
{
zauto_lock l(failure_detector::_lock);
register_worker(node);
}
void clear()
{
_connected_cb = {};
_disconnected_cb = {};
}
};
class test_worker : public service_app, public serverlet<test_worker>
{
public:
test_worker(const service_app_info *info) : service_app(info), serverlet("test_worker") {}
error_code start(const std::vector<std::string> &args) override
{
std::vector<host_port> master_group;
for (int i = 0; i < 3; ++i)
master_group.push_back(host_port("localhost", MPORT_START + i));
_worker_fd = new worker_fd_test(nullptr, master_group);
_worker_fd->start(1, 1, 9, 10);
++started_apps;
register_rpc_handler(
RPC_MASTER_CONFIG, "RPC_MASTER_CONFIG", &test_worker::on_master_config);
return ERR_OK;
}
error_code stop(bool) override { return ERR_OK; }
void on_master_config(const config_master_message &request, bool &response)
{
LOG_DEBUG("master config, request: {}, type: {}",
request.master,
request.is_register ? "reg" : "unreg");
host_port hp_master;
GET_HOST_PORT(request, master, hp_master);
if (request.is_register)
_worker_fd->register_master(hp_master);
else
_worker_fd->unregister_master(hp_master);
response = true;
}
worker_fd_test *fd() { return _worker_fd; }
private:
worker_fd_test *_worker_fd;
};
class test_master : public service_app
{
public:
test_master(const service_app_info *info) : ::dsn::service_app(info) {}
error_code start(const std::vector<std::string> &args) override
{
FLAGS_stable_rs_min_running_seconds = 10;
FLAGS_max_succssive_unstable_restart = 10;
_master_fd = new master_fd_test();
_master_fd->set_options(&_opts);
bool use_allow_list = false;
if (args.size() >= 3 && args[1] == "whitelist") {
std::vector<std::string> ports;
utils::split_args(args[2].c_str(), ports, ',');
for (auto &port : ports) {
rpc_address addr(network::get_local_ipv4(), std::stoi(port));
const auto hp = ::dsn::host_port::from_address(addr);
_master_fd->add_allow_list(hp);
}
use_allow_list = true;
}
_master_fd->start(1, 1, 9, 10, use_allow_list);
LOG_DEBUG("{}", _master_fd->get_allow_list({}));
++started_apps;
return ERR_OK;
}
error_code stop(bool) override { return ERR_OK; }
master_fd_test *fd() { return _master_fd; }
private:
master_fd_test *_master_fd;
replication::fd_suboptions _opts;
};
bool spin_wait_condition(const std::function<bool()> &pred, int seconds)
{
for (int i = 0; i != seconds; ++i) {
if (pred())
return true;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return pred();
}
void fd_test_init()
{
dsn::service_app::register_factory<test_worker>("worker");
dsn::service_app::register_factory<test_master>("master");
srand(time(0));
}
bool get_worker_and_master(test_worker *&worker, std::vector<test_master *> &masters)
{
bool ans = spin_wait_condition([]() { return started_apps == MCOUNT + 1; }, 30);
if (!ans)
return false;
std::vector<service_app *> apps;
service_app::get_all_service_apps(&apps);
masters.resize(MCOUNT, nullptr);
worker = nullptr;
for (int i = 0; i != apps.size(); ++i) {
if (apps[i]->info().type == "worker") {
if (worker != nullptr)
return false;
worker = reinterpret_cast<test_worker *>(apps[i]);
} else if (apps[i]->info().type == "master") {
int index = apps[i]->info().index - 1;
if (index >= masters.size() || masters[index] != nullptr)
return false;
masters[index] = reinterpret_cast<test_master *>(apps[i]);
}
}
for (test_master *m : masters)
if (m == nullptr)
return false;
return true;
}
void master_group_set_leader(std::vector<test_master *> &master_group, int leader_index)
{
const auto hp_leader = host_port("localhost", MPORT_START + leader_index);
int i = 0;
for (test_master *&master : master_group) {
master->fd()->set_leader_for_test(hp_leader, leader_index == i);
i++;
}
}
void worker_set_leader(test_worker *worker, int leader_contact)
{
worker->fd()->set_leader_for_test(host_port("localhost", MPORT_START + leader_contact));
config_master_message msg;
msg.master = rpc_address::from_host_port("localhost", MPORT_START + leader_contact);
msg.is_register = true;
msg.__set_hp_master(host_port::from_address(msg.master));
error_code err;
bool response;
std::tie(err, response) = rpc::call_wait<bool>(
rpc_address::from_host_port("localhost", WPORT), dsn::task_code(RPC_MASTER_CONFIG), msg);
ASSERT_EQ(err, ERR_OK);
}
void clear(test_worker *worker, std::vector<test_master *> masters)
{
const auto &hp_leader = worker->fd()->get_servers().group_host_port()->leader();
const auto &leader = dsn::dns_resolver::instance().resolve_address(hp_leader);
config_master_message msg;
msg.master = leader;
msg.is_register = false;
msg.__set_hp_master(hp_leader);
error_code err;
bool response;
std::tie(err, response) = rpc::call_wait<bool>(
rpc_address::from_host_port("localhost", WPORT), dsn::task_code(RPC_MASTER_CONFIG), msg);
ASSERT_EQ(err, ERR_OK);
worker->fd()->toggle_send_ping(false);
std::for_each(masters.begin(), masters.end(), [](test_master *mst) {
mst->fd()->clear_workers();
mst->fd()->toggle_response_ping(true);
});
}
void finish(test_worker *worker, test_master *master, int master_index)
{
LOG_WARNING("start to finish");
std::atomic_int wait_count;
wait_count.store(2);
worker->fd()->when_disconnected(
[&wait_count, master_index](const std::vector<host_port> &addr_list) mutable {
ASSERT_EQ(addr_list.size(), 1);
ASSERT_EQ(addr_list[0].port(), MPORT_START + master_index);
--wait_count;
});
master->fd()->when_disconnected([&wait_count](const std::vector<host_port> &addr_list) mutable {
ASSERT_EQ(addr_list.size(), 1);
ASSERT_EQ(addr_list[0].port(), WPORT);
--wait_count;
});
// we don't send any ping message now
worker->fd()->toggle_send_ping(false);
ASSERT_TRUE(spin_wait_condition([&wait_count] { return wait_count == 0; }, 20));
worker->fd()->clear();
master->fd()->clear();
}
TEST(fd, dummy_connect_disconnect)
{
test_worker *worker;
std::vector<test_master *> masters;
ASSERT_TRUE(get_worker_and_master(worker, masters));
clear(worker, masters);
// set master with smallest index as the leader
master_group_set_leader(masters, 0);
// set the worker contact leader
worker_set_leader(worker, 0);
test_master *leader = masters[0];
// simply wait for two connected
std::atomic_int wait_count;
wait_count.store(2);
worker->fd()->when_connected([&wait_count](host_port leader) mutable {
ASSERT_EQ(leader.port(), MPORT_START);
--wait_count;
});
leader->fd()->when_connected([&wait_count](host_port worker_addr) mutable {
ASSERT_EQ(worker_addr.port(), WPORT);
--wait_count;
});
worker->fd()->toggle_send_ping(true);
ASSERT_TRUE(spin_wait_condition([&wait_count] { return wait_count == 0; }, 20));
finish(worker, leader, 0);
}
TEST(fd, master_redirect)
{
test_worker *worker;
std::vector<test_master *> masters;
ASSERT_TRUE(get_worker_and_master(worker, masters));
int index = masters.size() - 1;
clear(worker, masters);
/* leader is the last master*/
master_group_set_leader(masters, index);
// we contact to 0
worker_set_leader(worker, 0);
test_master *leader = masters[index];
std::atomic_int wait_count;
wait_count.store(2);
/* although we contact to the first master, but in the end we must connect to the right leader
*/
worker->fd()->when_connected([&wait_count](host_port leader) mutable { --wait_count; });
leader->fd()->when_connected([&wait_count](host_port worker_addr) mutable {
ASSERT_EQ(worker_addr.port(), WPORT);
--wait_count;
});
worker->fd()->toggle_send_ping(true);
ASSERT_TRUE(spin_wait_condition([&wait_count] { return wait_count == 0; }, 20));
// in the end, the worker will connect to the right master
ASSERT_TRUE(spin_wait_condition(
[worker, index] {
return worker->fd()->current_server_contact().port() == MPORT_START + index;
},
20));
finish(worker, leader, index);
}
TEST(fd, switch_new_master_suddenly)
{
test_worker *worker;
std::vector<test_master *> masters;
ASSERT_TRUE(get_worker_and_master(worker, masters));
clear(worker, masters);
test_master *tst_master;
int index = 0;
master_group_set_leader(masters, index);
// and now we contact to 1
worker_set_leader(worker, 1);
tst_master = masters[index];
std::atomic_int wait_count;
wait_count.store(2);
auto cb = [&wait_count](host_port) mutable { --wait_count; };
worker->fd()->when_connected(cb);
tst_master->fd()->when_connected(cb);
worker->fd()->toggle_send_ping(true);
ASSERT_TRUE(spin_wait_condition([&wait_count]() { return wait_count == 0; }, 20));
ASSERT_EQ(worker->fd()->current_server_contact().port(), MPORT_START + index);
worker->fd()->when_connected(nullptr);
/* we select a new leader */
index = masters.size() - 1;
tst_master = masters[index];
/*
* for perfect FD, the new master should assume the worker connected.
* But first we test if the worker can connect to the new master.
* So clear all the workers
*/
tst_master->fd()->clear_workers();
wait_count.store(1);
tst_master->fd()->when_connected([&wait_count](host_port addr) mutable {
ASSERT_EQ(addr.port(), WPORT);
--wait_count;
});
master_group_set_leader(masters, index);
/* now we can worker the worker to connect to the new master */
ASSERT_TRUE(spin_wait_condition([&wait_count]() { return wait_count == 0; }, 20));
/* it may takes time for worker to switch to new master, but 20 seconds
* is enough as in our setting, lease_period is 9 seconds. */
ASSERT_TRUE(spin_wait_condition(
[worker, index]() {
return worker->fd()->current_server_contact().port() == MPORT_START + index;
},
20));
finish(worker, tst_master, index);
}
TEST(fd, old_master_died)
{
test_worker *worker;
std::vector<test_master *> masters;
ASSERT_TRUE(get_worker_and_master(worker, masters));
clear(worker, masters);
test_master *tst_master;
int index = 0;
master_group_set_leader(masters, index);
// and now we contact to 0
worker_set_leader(worker, 0);
tst_master = masters[index];
std::atomic_int wait_count;
wait_count.store(2);
auto cb = [&wait_count](host_port) mutable { --wait_count; };
worker->fd()->when_connected(cb);
tst_master->fd()->when_connected(cb);
worker->fd()->toggle_send_ping(true);
ASSERT_TRUE(spin_wait_condition([&wait_count]() -> bool { return wait_count == 0; }, 20));
ASSERT_EQ(worker->fd()->current_server_contact().port(), MPORT_START + index);
worker->fd()->when_connected(nullptr);
tst_master->fd()->when_connected(nullptr);
worker->fd()->when_disconnected([](const std::vector<host_port> &masters_list) {
ASSERT_EQ(masters_list.size(), 1);
LOG_DEBUG("disconnect from master: {}", masters_list[0]);
});
/*first let's stop the old master*/
tst_master->fd()->toggle_response_ping(false);
/* then select a new one */
index = masters.size() - 1;
tst_master = masters[index];
/* only for test */
tst_master->fd()->clear_workers();
wait_count.store(1);
tst_master->fd()->when_connected([&wait_count](host_port addr) mutable {
EXPECT_EQ(addr.port(), WPORT);
--wait_count;
});
master_group_set_leader(masters, index);
/* now we can wait the worker to connect to the new master */
ASSERT_TRUE(spin_wait_condition([&wait_count]() { return wait_count == 0; }, 20));
/* it may takes time for worker to switch to new master, but 20 seconds
* is enough as in our setting, lease_period is 9 seconds. */
ASSERT_TRUE(spin_wait_condition(
[worker, index]() {
return worker->fd()->current_server_contact().port() == MPORT_START + index;
},
20));
finish(worker, tst_master, index);
}
TEST(fd, worker_died_when_switch_master)
{
test_worker *worker;
std::vector<test_master *> masters;
ASSERT_TRUE(get_worker_and_master(worker, masters));
clear(worker, masters);
test_master *tst_master;
int index = 0;
master_group_set_leader(masters, index);
// and now we contact to 0
worker_set_leader(worker, 0);
tst_master = masters[index];
std::atomic_int wait_count;
wait_count.store(2);
auto cb = [&wait_count](host_port) mutable { --wait_count; };
worker->fd()->when_connected(cb);
tst_master->fd()->when_connected(cb);
worker->fd()->toggle_send_ping(true);
ASSERT_TRUE(spin_wait_condition([&wait_count]() { return wait_count == 0; }, 20));
ASSERT_EQ(worker->fd()->current_server_contact().port(), MPORT_START + index);
worker->fd()->when_connected(nullptr);
tst_master->fd()->when_connected(nullptr);
/*first stop the old leader*/
tst_master->fd()->toggle_response_ping(false);
/*then select another leader*/
index = masters.size() - 1;
tst_master = masters[index];
wait_count.store(2);
tst_master->fd()->when_disconnected(
[&wait_count](const std::vector<host_port> &worker_list) mutable {
ASSERT_EQ(worker_list.size(), 1);
ASSERT_EQ(worker_list[0].port(), WPORT);
wait_count--;
});
worker->fd()->when_disconnected(
[&wait_count](const std::vector<host_port> &master_list) mutable {
ASSERT_EQ(master_list.size(), 1);
wait_count--;
});
/* we assume the worker is alive */
tst_master->fd()->test_register_worker(host_port("localhost", WPORT));
master_group_set_leader(masters, index);
/* then stop the worker*/
worker->fd()->toggle_send_ping(false);
ASSERT_TRUE(spin_wait_condition([&wait_count] { return wait_count == 0; }, 20));
}
dsn::message_ex *create_fake_rpc_response()
{
dsn::message_ex *req =
dsn::message_ex::create_received_request(RPC_MASTER_CONFIG, DSF_THRIFT_BINARY, nullptr, 0);
dsn::message_ex *response = req->create_response();
req->add_ref();
req->release_ref();
return response;
}
TEST(fd, update_stability)
{
test_worker *worker;
std::vector<test_master *> masters;
ASSERT_TRUE(get_worker_and_master(worker, masters));
clear(worker, masters);
master_group_set_leader(masters, 0);
master_fd_test *fd = masters[0]->fd();
fd->toggle_response_ping(true);
replication::fd_suboptions opts;
FLAGS_stable_rs_min_running_seconds = 5;
FLAGS_max_succssive_unstable_restart = 2;
fd->set_options(&opts);
replication::meta_server_failure_detector::stability_map *smap =
fd->get_stability_map_for_test();
smap->clear();
dsn::rpc_replier<beacon_ack> r(create_fake_rpc_response());
beacon_msg msg;
msg.from_node = rpc_address::from_host_port("localhost", 123);
msg.__set_hp_from_node(host_port("localhost", 123));
msg.to_node = rpc_address::from_host_port("localhost", MPORT_START);
msg.__set_hp_to_node(host_port("localhost", MPORT_START));
msg.time = dsn_now_ms();
msg.__isset.start_time = true;
msg.start_time = 1000;
// first on ping
fd->on_ping(msg, r);
ASSERT_EQ(1, smap->size());
ASSERT_NE(smap->end(), smap->find(msg.hp_from_node));
replication::meta_server_failure_detector::worker_stability &ws =
smap->find(msg.hp_from_node)->second;
ASSERT_EQ(0, ws.unstable_restart_count);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_TRUE(r.is_empty());
r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response());
// unstably restart and resend ping
msg.start_time += 4000;
fd->on_ping(msg, r);
ASSERT_EQ(1, ws.unstable_restart_count);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_TRUE(r.is_empty());
r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response());
// upstably restart and resend ping again, the node's ping will be ignored
msg.start_time += 4000;
fd->on_ping(msg, r);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_EQ(2, ws.unstable_restart_count);
ASSERT_FALSE(r.is_empty());
// stably restart, the meta stability & unstable_count will be reset
msg.start_time += 10000;
fd->on_ping(msg, r);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_EQ(0, ws.unstable_restart_count);
ASSERT_TRUE(r.is_empty());
r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response());
// unstably restart, unstable-count++
msg.start_time += 4000;
fd->on_ping(msg, r);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_EQ(1, ws.unstable_restart_count);
ASSERT_TRUE(r.is_empty());
r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response());
// not restart, unstable-count will be reset
fd->on_ping(msg, r);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_EQ(0, ws.unstable_restart_count);
ASSERT_TRUE(r.is_empty());
r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response());
// old message, will be ignored
msg.start_time -= 4000;
fd->on_ping(msg, r);
ASSERT_EQ(msg.start_time + 4000, ws.last_start_time_ms);
ASSERT_EQ(0, ws.unstable_restart_count);
ASSERT_TRUE(r.is_empty());
r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response());
// unstable restart, unstable-count++
msg.start_time += 8000;
fd->on_ping(msg, r);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_EQ(1, ws.unstable_restart_count);
ASSERT_TRUE(r.is_empty());
r = dsn::rpc_replier<beacon_ack>(create_fake_rpc_response());
// unstable restart, unstable-count++, node's ping will be ignored
msg.start_time += 4000;
fd->on_ping(msg, r);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_EQ(2, ws.unstable_restart_count);
ASSERT_FALSE(r.is_empty());
// reset stat
fd->reset_stability_stat(msg.hp_from_node);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_EQ(0, ws.unstable_restart_count);
}
TEST(fd, not_in_whitelist)
{
test_worker *worker;
std::vector<test_master *> masters;
ASSERT_TRUE(get_worker_and_master(worker, masters));
clear(worker, masters);
// set master with smallest index as the leader
master_group_set_leader(masters, 0);
// set the worker contact leader
worker_set_leader(worker, 0);
std::atomic_int wait_count;
wait_count.store(1);
auto cb = [&wait_count](host_port) mutable { --wait_count; };
worker->fd()->when_connected(cb);
worker->fd()->toggle_send_ping(true);
ASSERT_TRUE(spin_wait_condition([&wait_count] { return wait_count == 1; }, 20));
}