blob: 25c16d675767104b356719bfc6abf181d05a3534 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "misc.h"
#include <boost/lexical_cast.hpp>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <stdio.h>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <iostream>
#include <set>
#include <string>
#include <thread>
#include <unordered_map>
#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replication_enums.h"
#include "common/replication_other_types.h"
#include "dsn.layer2_types.h"
#include "duplication_types.h"
#include "meta_admin_types.h"
#include "metadata_types.h"
#include "utils/fmt_logging.h"
#include "utils/rand.h"
using namespace dsn::replication;
uint32_t random32(uint32_t min, uint32_t max)
{
uint32_t res = (uint32_t)(rand() % (max - min + 1));
return res + min;
}
void generate_node_list(std::vector<dsn::rpc_address> &output_list, int min_count, int max_count)
{
int count = random32(min_count, max_count);
output_list.resize(count);
for (int i = 0; i < count; ++i)
output_list[i].assign_ipv4("127.0.0.1", i + 1);
}
void verbose_apps(const app_mapper &input_apps)
{
std::cout << input_apps.size() << std::endl;
for (const auto &apps : input_apps) {
const std::shared_ptr<app_state> &app = apps.second;
std::cout << apps.first << " " << app->partition_count << std::endl;
for (int i = 0; i < app->partition_count; ++i) {
std::cout << app->partitions[i].secondaries.size() + 1 << " "
<< app->partitions[i].primary.to_string();
for (int j = 0; j < app->partitions[i].secondaries.size(); ++j) {
std::cout << " " << app->partitions[i].secondaries[j].to_string();
}
std::cout << std::endl;
}
}
}
void generate_node_mapper(
/*out*/ node_mapper &output_nodes,
const app_mapper &input_apps,
const std::vector<dsn::rpc_address> &input_node_list)
{
output_nodes.clear();
for (auto &addr : input_node_list) {
get_node_state(output_nodes, addr, true)->set_alive(true);
}
for (auto &kv : input_apps) {
const std::shared_ptr<app_state> &app = kv.second;
for (const dsn::partition_configuration &pc : app->partitions) {
node_state *ns;
if (!pc.primary.is_invalid()) {
ns = get_node_state(output_nodes, pc.primary, true);
ns->put_partition(pc.pid, true);
}
for (const dsn::rpc_address &sec : pc.secondaries) {
CHECK(!sec.is_invalid(), "");
ns = get_node_state(output_nodes, sec, true);
ns->put_partition(pc.pid, false);
}
}
}
}
void generate_app(/*out*/ std::shared_ptr<app_state> &app,
const std::vector<dsn::rpc_address> &node_list)
{
for (dsn::partition_configuration &pc : app->partitions) {
pc.ballot = random32(1, 10000);
std::vector<int> indices(3, 0);
indices[0] = random32(0, node_list.size() - 3);
indices[1] = random32(indices[0] + 1, node_list.size() - 2);
indices[2] = random32(indices[1] + 1, node_list.size() - 1);
int p = random32(0, 2);
pc.primary = node_list[indices[p]];
pc.secondaries.clear();
for (unsigned int i = 0; i != indices.size(); ++i)
if (i != p)
pc.secondaries.push_back(node_list[indices[i]]);
CHECK(!pc.primary.is_invalid(), "");
CHECK(!is_secondary(pc, pc.primary), "");
CHECK_EQ(pc.secondaries.size(), 2);
CHECK_NE(pc.secondaries[0], pc.secondaries[1]);
}
}
void generate_app_serving_replica_info(/*out*/ std::shared_ptr<dsn::replication::app_state> &app,
int total_disks)
{
char buffer[256];
for (int i = 0; i < app->partition_count; ++i) {
config_context &cc = app->helpers->contexts[i];
dsn::partition_configuration &pc = app->partitions[i];
replica_info ri;
snprintf(buffer, 256, "disk%u", dsn::rand::next_u32(1, total_disks));
ri.disk_tag = buffer;
cc.collect_serving_replica(pc.primary, ri);
for (const dsn::rpc_address &addr : pc.secondaries) {
snprintf(buffer, 256, "disk%u", dsn::rand::next_u32(1, total_disks));
ri.disk_tag = buffer;
cc.collect_serving_replica(addr, ri);
}
}
}
void generate_apps(/*out*/ dsn::replication::app_mapper &mapper,
const std::vector<dsn::rpc_address> &node_list,
int apps_count,
int disks_per_node,
std::pair<uint32_t, uint32_t> partitions_range,
bool generate_serving_info)
{
mapper.clear();
dsn::app_info info;
for (int i = 1; i <= apps_count; ++i) {
info.status = dsn::app_status::AS_AVAILABLE;
info.app_id = i;
info.is_stateful = true;
info.app_name = "test_app" + boost::lexical_cast<std::string>(i);
info.app_type = "test";
info.max_replica_count = 3;
info.partition_count = random32(partitions_range.first, partitions_range.second);
std::shared_ptr<app_state> the_app = app_state::create(info);
generate_app(the_app, node_list);
if (generate_serving_info) {
generate_app_serving_replica_info(the_app, disks_per_node);
}
LOG_DEBUG("generated app, partitions({})", info.partition_count);
mapper.emplace(the_app->app_id, the_app);
}
}
void generate_node_fs_manager(const app_mapper &apps,
const node_mapper &nodes,
/*out*/ nodes_fs_manager &nfm,
int total_disks)
{
nfm.clear();
const char *prefix = "/home/work/";
char pid_dir[256];
std::vector<std::string> data_dirs(total_disks);
std::vector<std::string> tags(total_disks);
for (int i = 0; i < data_dirs.size(); ++i) {
snprintf(pid_dir, 256, "%sdisk%d", prefix, i + 1);
data_dirs[i] = pid_dir;
snprintf(pid_dir, 256, "disk%d", i + 1);
tags[i] = pid_dir;
}
for (const auto &kv : nodes) {
const node_state &ns = kv.second;
if (nfm.find(ns.addr()) == nfm.end()) {
nfm.emplace(ns.addr(), std::make_shared<fs_manager>());
}
fs_manager &manager = *(nfm.find(ns.addr())->second);
manager.initialize(data_dirs, tags);
ns.for_each_partition([&](const dsn::gpid &pid) {
const config_context &cc = *get_config_context(apps, pid);
snprintf(pid_dir,
256,
"%s%s/%d.%d.test",
prefix,
cc.find_from_serving(ns.addr())->disk_tag.c_str(),
pid.get_app_id(),
pid.get_partition_index());
LOG_DEBUG("concat pid_dir({}) of node({})", pid_dir, ns.addr());
manager.add_replica(pid, pid_dir);
return true;
});
}
}
void track_disk_info_check_and_apply(const dsn::replication::configuration_proposal_action &act,
const dsn::gpid &pid,
/*in-out*/ dsn::replication::app_mapper &apps,
/*in-out*/ dsn::replication::node_mapper & /*nodes*/,
/*in-out*/ nodes_fs_manager &manager)
{
config_context *cc = get_config_context(apps, pid);
CHECK_NOTNULL(cc, "");
fs_manager *target_manager = get_fs_manager(manager, act.target);
CHECK_NOTNULL(target_manager, "");
fs_manager *node_manager = get_fs_manager(manager, act.node);
CHECK_NOTNULL(node_manager, "");
std::string dir;
replica_info ri;
switch (act.type) {
case config_type::CT_ASSIGN_PRIMARY: {
auto selected = target_manager->find_best_dir_for_new_replica(pid);
CHECK_NOTNULL(selected, "");
selected->holding_replicas[pid.get_app_id()].emplace(pid);
cc->collect_serving_replica(act.target, ri);
break;
}
case config_type::CT_ADD_SECONDARY:
case config_type::CT_ADD_SECONDARY_FOR_LB: {
auto selected = node_manager->find_best_dir_for_new_replica(pid);
CHECK_NOTNULL(selected, "");
selected->holding_replicas[pid.get_app_id()].emplace(pid);
cc->collect_serving_replica(act.node, ri);
break;
}
case config_type::CT_DOWNGRADE_TO_SECONDARY:
case config_type::CT_UPGRADE_TO_PRIMARY:
break;
case config_type::CT_REMOVE:
case config_type::CT_DOWNGRADE_TO_INACTIVE:
node_manager->remove_replica(pid);
cc->remove_from_serving(act.node);
break;
default:
CHECK(false, "");
break;
}
}
void proposal_action_check_and_apply(const configuration_proposal_action &act,
const dsn::gpid &pid,
app_mapper &apps,
node_mapper &nodes,
nodes_fs_manager *manager)
{
dsn::partition_configuration &pc = *get_config(apps, pid);
node_state *ns;
++pc.ballot;
CHECK_NE(act.type, config_type::CT_INVALID);
CHECK(!act.target.is_invalid(), "");
CHECK(!act.node.is_invalid(), "");
if (manager) {
track_disk_info_check_and_apply(act, pid, apps, nodes, *manager);
}
switch (act.type) {
case config_type::CT_ASSIGN_PRIMARY:
CHECK_EQ(act.node, act.target);
CHECK(pc.primary.is_invalid(), "");
CHECK(pc.secondaries.empty(), "");
pc.primary = act.node;
ns = &nodes[act.node];
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE);
ns->put_partition(pc.pid, true);
break;
case config_type::CT_ADD_SECONDARY:
CHECK_EQ(act.target, pc.primary);
CHECK(!is_member(pc, act.node), "");
pc.secondaries.push_back(act.node);
ns = &nodes[act.node];
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE);
ns->put_partition(pc.pid, false);
break;
case config_type::CT_DOWNGRADE_TO_SECONDARY:
CHECK_EQ(act.node, act.target);
CHECK_EQ(act.node, pc.primary);
CHECK(nodes.find(act.node) != nodes.end(), "");
CHECK(!is_secondary(pc, pc.primary), "");
nodes[act.node].remove_partition(pc.pid, true);
pc.secondaries.push_back(pc.primary);
pc.primary.set_invalid();
break;
case config_type::CT_UPGRADE_TO_PRIMARY:
CHECK(pc.primary.is_invalid(), "");
CHECK_EQ(act.node, act.target);
CHECK(is_secondary(pc, act.node), "");
CHECK(nodes.find(act.node) != nodes.end(), "");
ns = &nodes[act.node];
pc.primary = act.node;
CHECK(replica_helper::remove_node(act.node, pc.secondaries), "");
ns->put_partition(pc.pid, true);
break;
case config_type::CT_ADD_SECONDARY_FOR_LB:
CHECK_EQ(act.target, pc.primary);
CHECK(!is_member(pc, act.node), "");
CHECK(!act.node.is_invalid(), "");
pc.secondaries.push_back(act.node);
ns = &nodes[act.node];
ns->put_partition(pc.pid, false);
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_SECONDARY);
break;
// in balancer, remove primary is not allowed
case config_type::CT_REMOVE:
case config_type::CT_DOWNGRADE_TO_INACTIVE:
CHECK(!pc.primary.is_invalid(), "");
CHECK_EQ(pc.primary, act.target);
CHECK(is_secondary(pc, act.node), "");
CHECK(nodes.find(act.node) != nodes.end(), "");
CHECK(replica_helper::remove_node(act.node, pc.secondaries), "");
ns = &nodes[act.node];
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_SECONDARY);
ns->remove_partition(pc.pid, false);
break;
default:
CHECK(false, "");
break;
}
}
void migration_check_and_apply(app_mapper &apps,
node_mapper &nodes,
migration_list &ml,
nodes_fs_manager *manager)
{
int i = 0;
for (auto kv = ml.begin(); kv != ml.end(); ++kv) {
std::shared_ptr<configuration_balancer_request> &proposal = kv->second;
LOG_DEBUG("the {}th round of proposal, gpid({})", i++, proposal->gpid);
std::shared_ptr<app_state> &the_app = apps.find(proposal->gpid.get_app_id())->second;
CHECK_EQ(proposal->gpid.get_app_id(), the_app->app_id);
CHECK_LT(proposal->gpid.get_partition_index(), the_app->partition_count);
dsn::partition_configuration &pc =
the_app->partitions[proposal->gpid.get_partition_index()];
CHECK(!pc.primary.is_invalid(), "");
CHECK_EQ(pc.secondaries.size(), 2);
for (auto &addr : pc.secondaries) {
CHECK(!addr.is_invalid(), "");
}
CHECK(!is_secondary(pc, pc.primary), "");
for (unsigned int j = 0; j < proposal->action_list.size(); ++j) {
configuration_proposal_action &act = proposal->action_list[j];
LOG_DEBUG("the {}th round of action, type: {}, node: {}, target: {}",
j,
dsn::enum_to_string(act.type),
act.node,
act.target);
proposal_action_check_and_apply(act, proposal->gpid, apps, nodes, manager);
}
}
}
void app_mapper_compare(const app_mapper &mapper1, const app_mapper &mapper2)
{
CHECK_EQ(mapper1.size(), mapper2.size());
for (auto &kv : mapper1) {
const std::shared_ptr<app_state> &app1 = kv.second;
CHECK(mapper2.find(app1->app_id) != mapper2.end(), "");
const std::shared_ptr<app_state> app2 = mapper2.find(app1->app_id)->second;
CHECK_EQ(app1->app_id, app2->app_id);
CHECK_EQ(app1->app_name, app2->app_name);
CHECK_EQ(app1->app_type, app2->app_type);
CHECK_EQ(app1->status, app2->status);
CHECK(app1->status == dsn::app_status::AS_AVAILABLE ||
app1->status == dsn::app_status::AS_DROPPED,
"");
if (app1->status == dsn::app_status::AS_AVAILABLE) {
CHECK_EQ(app1->partition_count, app2->partition_count);
for (unsigned int i = 0; i < app1->partition_count; ++i) {
CHECK(is_partition_config_equal(app1->partitions[i], app2->partitions[i]), "");
}
}
}
}
bool spin_wait_condition(const std::function<bool()> &pred, int seconds)
{
for (int i = 0; i != seconds; ++i) {
std::atomic_thread_fence(std::memory_order_seq_cst);
if (pred())
return true;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return pred();
}