blob: 6a33cd30a709a96d59c8bb3439e4341369556089 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cluster_balance_policy.h"
#include <limits.h>
#include <stdlib.h>
#include <cstdint>
#include <functional>
#include <iterator>
#include <unordered_map>
#include "dsn.layer2_types.h"
#include "meta/load_balance_policy.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/utils.h"
DSN_DEFINE_uint32(meta_server,
balance_op_count_per_round,
10,
"balance operation count per round for cluster balancer");
DSN_TAG_VARIABLE(balance_op_count_per_round, FT_MUTABLE);
namespace dsn {
namespace replication {
class meta_service;
uint32_t get_partition_count(const node_state &ns, balance_type type, int32_t app_id)
{
unsigned count = 0;
switch (type) {
case balance_type::COPY_SECONDARY:
if (app_id > 0) {
count = ns.partition_count(app_id) - ns.primary_count(app_id);
} else {
count = ns.partition_count() - ns.primary_count();
}
break;
case balance_type::COPY_PRIMARY:
if (app_id > 0) {
count = ns.primary_count(app_id);
} else {
count = ns.primary_count();
}
break;
default:
break;
}
return (uint32_t)count;
}
uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map)
{
uint32_t min = UINT_MAX, max = 0;
for (const auto &kv : count_map) {
if (kv.second < min) {
min = kv.second;
}
if (kv.second > max) {
max = kv.second;
}
}
return max - min;
}
void get_min_max_set(const std::map<rpc_address, uint32_t> &node_count_map,
/*out*/ std::set<rpc_address> &min_set,
/*out*/ std::set<rpc_address> &max_set)
{
std::multimap<uint32_t, rpc_address> count_multimap = utils::flip_map(node_count_map);
auto range = count_multimap.equal_range(count_multimap.begin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
min_set.insert(iter->second);
}
range = count_multimap.equal_range(count_multimap.rbegin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
max_set.insert(iter->second);
}
}
cluster_balance_policy::cluster_balance_policy(meta_service *svc) : load_balance_policy(svc) {}
void cluster_balance_policy::balance(bool checker,
const meta_view *global_view,
migration_list *list)
{
init(global_view, list);
if (!execute_balance(*_global_view->apps,
false, /* balance_checker */
true, /* balance_in_turn */
true, /* only_move_primary */
std::bind(&cluster_balance_policy::primary_balance,
this,
std::placeholders::_1,
std::placeholders::_2))) {
return;
}
bool need_continue =
cluster_replica_balance(_global_view, balance_type::COPY_SECONDARY, *_migration_result);
if (!need_continue) {
return;
}
cluster_replica_balance(_global_view, balance_type::COPY_PRIMARY, *_migration_result);
}
bool cluster_balance_policy::cluster_replica_balance(const meta_view *global_view,
const balance_type type,
/*out*/ migration_list &list)
{
bool enough_information = do_cluster_replica_balance(global_view, type, list);
if (!enough_information) {
return false;
}
if (!list.empty()) {
LOG_INFO("migration count of {} = {}", enum_to_string(type), list.size());
return false;
}
return true;
}
bool cluster_balance_policy::do_cluster_replica_balance(const meta_view *global_view,
const balance_type type,
/*out*/ migration_list &list)
{
cluster_migration_info cluster_info;
if (!get_cluster_migration_info(global_view, type, cluster_info)) {
return false;
}
partition_set selected_pid;
move_info next_move;
while (get_next_move(cluster_info, selected_pid, next_move)) {
if (!apply_move(next_move, selected_pid, list, cluster_info)) {
break;
}
if (list.size() >= FLAGS_balance_op_count_per_round) {
break;
}
}
return true;
}
bool cluster_balance_policy::get_cluster_migration_info(
const meta_view *global_view,
const balance_type type,
/*out*/ cluster_migration_info &cluster_info)
{
const node_mapper &nodes = *global_view->nodes;
if (nodes.size() < 3) {
return false;
}
const app_mapper &all_apps = *global_view->apps;
app_mapper apps;
for (const auto &kv : all_apps) {
const std::shared_ptr<app_state> &app = kv.second;
auto ignored = is_ignored_app(app->app_id);
if (ignored || app->is_bulk_loading || app->splitting()) {
LOG_INFO("skip to balance app({}), ignored={}, bulk loading={}, splitting={}",
app->app_name,
ignored,
app->is_bulk_loading,
app->splitting());
continue;
}
if (app->status == app_status::AS_AVAILABLE) {
apps[app->app_id] = app;
}
}
for (const auto &kv : apps) {
std::shared_ptr<app_state> app = kv.second;
app_migration_info info;
if (!get_app_migration_info(app, nodes, type, info)) {
return false;
}
cluster_info.apps_info.emplace(kv.first, std::move(info));
cluster_info.apps_skew[kv.first] = get_skew(info.replicas_count);
}
for (const auto &kv : nodes) {
const node_state &ns = kv.second;
node_migration_info info;
get_node_migration_info(ns, apps, info);
cluster_info.nodes_info.emplace(kv.first, std::move(info));
auto count = get_partition_count(ns, type, -1);
cluster_info.replicas_count[kv.first] = count;
}
cluster_info.type = type;
return true;
}
bool cluster_balance_policy::get_app_migration_info(std::shared_ptr<app_state> app,
const node_mapper &nodes,
const balance_type type,
app_migration_info &info)
{
info.app_id = app->app_id;
info.app_name = app->app_name;
info.partitions.resize(app->partitions.size());
for (auto i = 0; i < app->partitions.size(); ++i) {
std::map<rpc_address, partition_status::type> pstatus_map;
pstatus_map[app->partitions[i].primary] = partition_status::PS_PRIMARY;
if (app->partitions[i].secondaries.size() != app->partitions[i].max_replica_count - 1) {
// partition is unhealthy
return false;
}
for (const auto &addr : app->partitions[i].secondaries) {
pstatus_map[addr] = partition_status::PS_SECONDARY;
}
info.partitions[i] = pstatus_map;
}
for (const auto &it : nodes) {
const node_state &ns = it.second;
auto count = get_partition_count(ns, type, app->app_id);
info.replicas_count[ns.addr()] = count;
}
return true;
}
void cluster_balance_policy::get_node_migration_info(const node_state &ns,
const app_mapper &apps,
/*out*/ node_migration_info &info)
{
info.address = ns.addr();
for (const auto &iter : apps) {
std::shared_ptr<app_state> app = iter.second;
for (const auto &context : app->helpers->contexts) {
std::string disk_tag;
if (!context.get_disk_tag(ns.addr(), disk_tag)) {
continue;
}
auto pid = context.config_owner->pid;
if (info.partitions.find(disk_tag) != info.partitions.end()) {
info.partitions[disk_tag].insert(pid);
} else {
partition_set pset;
pset.insert(pid);
info.partitions.emplace(disk_tag, pset);
}
}
}
}
bool cluster_balance_policy::get_next_move(const cluster_migration_info &cluster_info,
const partition_set &selected_pid,
/*out*/ move_info &next_move)
{
// key-app skew, value-app id
std::multimap<uint32_t, int32_t> app_skew_multimap = utils::flip_map(cluster_info.apps_skew);
auto max_app_skew = app_skew_multimap.rbegin()->first;
if (max_app_skew == 0) {
LOG_INFO("every app is balanced and any move will unbalance a app");
return false;
}
auto server_skew = get_skew(cluster_info.replicas_count);
if (max_app_skew <= 1 && server_skew <= 1) {
LOG_INFO("every app is balanced and the cluster as a whole is balanced");
return false;
}
/**
* Among the apps with maximum skew, attempt to pick a app where there is
* a move that improves the app skew and the cluster skew, if possible. If
* not, attempt to pick a move that improves the app skew.
**/
std::set<rpc_address> cluster_min_count_nodes;
std::set<rpc_address> cluster_max_count_nodes;
get_min_max_set(cluster_info.replicas_count, cluster_min_count_nodes, cluster_max_count_nodes);
bool found = false;
auto app_range = app_skew_multimap.equal_range(max_app_skew);
for (auto iter = app_range.first; iter != app_range.second; ++iter) {
auto app_id = iter->second;
auto it = cluster_info.apps_info.find(app_id);
if (it == cluster_info.apps_info.end()) {
continue;
}
auto app_map = it->second.replicas_count;
std::set<rpc_address> app_min_count_nodes;
std::set<rpc_address> app_max_count_nodes;
get_min_max_set(app_map, app_min_count_nodes, app_max_count_nodes);
/**
* Compute the intersection of the replica servers most loaded for the app
* with the replica servers most loaded overall, and likewise for least loaded.
* These are our ideal candidates for moving from and to, respectively.
**/
std::set<rpc_address> app_cluster_min_set =
utils::get_intersection(app_min_count_nodes, cluster_min_count_nodes);
std::set<rpc_address> app_cluster_max_set =
utils::get_intersection(app_max_count_nodes, cluster_max_count_nodes);
/**
* Do not move replicas of a balanced app if the least (most) loaded
* servers overall do not intersect the servers hosting the least (most)
* replicas of the app. Moving a replica in that case might keep the
* cluster skew the same or make it worse while keeping the app balanced.
**/
std::multimap<uint32_t, rpc_address> app_count_multimap = utils::flip_map(app_map);
if (app_count_multimap.rbegin()->first <= app_count_multimap.begin()->first + 1 &&
(app_cluster_min_set.empty() || app_cluster_max_set.empty())) {
LOG_INFO("do not move replicas of a balanced app({}) if the least (most) loaded "
"servers overall do not intersect the servers hosting the least (most) "
"replicas of the app",
app_id);
continue;
}
if (pick_up_move(cluster_info,
app_cluster_max_set.empty() ? app_max_count_nodes : app_cluster_max_set,
app_cluster_min_set.empty() ? app_min_count_nodes : app_cluster_min_set,
app_id,
selected_pid,
next_move)) {
found = true;
break;
}
}
return found;
}
template <typename S>
auto select_random(const S &s, size_t n)
{
auto it = std::begin(s);
std::advance(it, n);
return it;
}
bool cluster_balance_policy::pick_up_move(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const std::set<rpc_address> &min_nodes,
const int32_t app_id,
const partition_set &selected_pid,
/*out*/ move_info &move_info)
{
std::set<app_disk_info> max_load_disk_set;
get_max_load_disk_set(cluster_info, max_nodes, app_id, max_load_disk_set);
if (max_load_disk_set.empty()) {
return false;
}
auto index = rand() % max_load_disk_set.size();
auto max_load_disk = *select_random(max_load_disk_set, index);
LOG_INFO("most load disk({}) on node({}) is picked, has {} partition",
max_load_disk.node,
max_load_disk.disk_tag,
max_load_disk.partitions.size());
for (const auto &node_addr : min_nodes) {
gpid picked_pid;
if (pick_up_partition(
cluster_info, node_addr, max_load_disk.partitions, selected_pid, picked_pid)) {
move_info.pid = picked_pid;
move_info.source_node = max_load_disk.node;
move_info.source_disk_tag = max_load_disk.disk_tag;
move_info.target_node = node_addr;
move_info.type = cluster_info.type;
LOG_INFO("partition[{}] will migrate from {} to {}",
picked_pid,
max_load_disk.node,
node_addr);
return true;
}
}
LOG_INFO("can not find a partition(app_id={}) from random max load disk(node={}, disk={})",
app_id,
max_load_disk.node,
max_load_disk.disk_tag);
return false;
}
void cluster_balance_policy::get_max_load_disk_set(
const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const int32_t app_id,
/*out*/ std::set<app_disk_info> &max_load_disk_set)
{
// key: partition count (app_disk_info.partitions.size())
// value: app_disk_info structure
std::multimap<uint32_t, app_disk_info> app_disk_info_multimap;
for (const auto &node_addr : max_nodes) {
// key: disk_tag
// value: partition set for app(app id=app_id) in node(addr=node_addr)
std::map<std::string, partition_set> disk_partitions =
get_disk_partitions_map(cluster_info, node_addr, app_id);
for (const auto &kv : disk_partitions) {
app_disk_info info;
info.app_id = app_id;
info.node = node_addr;
info.disk_tag = kv.first;
info.partitions = kv.second;
app_disk_info_multimap.insert(
std::pair<uint32_t, app_disk_info>(kv.second.size(), info));
}
}
auto range = app_disk_info_multimap.equal_range(app_disk_info_multimap.rbegin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
max_load_disk_set.insert(iter->second);
}
}
std::map<std::string, partition_set> cluster_balance_policy::get_disk_partitions_map(
const cluster_migration_info &cluster_info, const rpc_address &addr, const int32_t app_id)
{
std::map<std::string, partition_set> disk_partitions;
auto app_iter = cluster_info.apps_info.find(app_id);
auto node_iter = cluster_info.nodes_info.find(addr);
if (app_iter == cluster_info.apps_info.end() || node_iter == cluster_info.nodes_info.end()) {
return disk_partitions;
}
auto status = cluster_info.type == balance_type::COPY_SECONDARY ? partition_status::PS_SECONDARY
: partition_status::PS_PRIMARY;
auto app_partition = app_iter->second.partitions;
auto disk_partition = node_iter->second.partitions;
for (const auto &kv : disk_partition) {
auto disk_tag = kv.first;
for (const auto &pid : kv.second) {
if (pid.get_app_id() != app_id) {
continue;
}
auto status_map = app_partition[pid.get_partition_index()];
auto iter = status_map.find(addr);
if (iter != status_map.end() && iter->second == status) {
disk_partitions[disk_tag].insert(pid);
}
}
}
return disk_partitions;
}
bool cluster_balance_policy::pick_up_partition(const cluster_migration_info &cluster_info,
const rpc_address &min_node_addr,
const partition_set &max_load_partitions,
const partition_set &selected_pid,
/*out*/ gpid &picked_pid)
{
bool found = false;
for (const auto &pid : max_load_partitions) {
auto iter = cluster_info.apps_info.find(pid.get_app_id());
if (iter == cluster_info.apps_info.end()) {
continue;
}
// partition has already in mirgration list
if (selected_pid.find(pid) != selected_pid.end()) {
continue;
}
// partition has already been primary or secondary on min_node
app_migration_info info = iter->second;
if (info.get_partition_status(pid.get_partition_index(), min_node_addr) !=
partition_status::PS_INACTIVE) {
continue;
}
picked_pid = pid;
found = true;
break;
}
return found;
}
bool cluster_balance_policy::apply_move(const move_info &move,
/*out*/ partition_set &selected_pids,
/*out*/ migration_list &list,
/*out*/ cluster_migration_info &cluster_info)
{
int32_t app_id = move.pid.get_app_id();
rpc_address source = move.source_node, target = move.target_node;
if (cluster_info.apps_skew.find(app_id) == cluster_info.apps_skew.end() ||
cluster_info.replicas_count.find(source) == cluster_info.replicas_count.end() ||
cluster_info.replicas_count.find(target) == cluster_info.replicas_count.end() ||
cluster_info.apps_info.find(app_id) == cluster_info.apps_info.end()) {
return false;
}
app_migration_info app_info = cluster_info.apps_info[app_id];
if (app_info.partitions.size() <= move.pid.get_partition_index() ||
app_info.replicas_count.find(source) == app_info.replicas_count.end() ||
app_info.replicas_count.find(target) == app_info.replicas_count.end()) {
return false;
}
app_info.replicas_count[source]--;
app_info.replicas_count[target]++;
auto &pmap = app_info.partitions[move.pid.get_partition_index()];
rpc_address primary_addr;
for (const auto &kv : pmap) {
if (kv.second == partition_status::PS_PRIMARY) {
primary_addr = kv.first;
}
}
auto status = cluster_info.type == balance_type::COPY_SECONDARY ? partition_status::PS_SECONDARY
: partition_status::PS_PRIMARY;
auto iter = pmap.find(source);
if (iter == pmap.end() || iter->second != status) {
return false;
}
pmap.erase(source);
pmap[target] = status;
auto iters = cluster_info.nodes_info.find(source);
auto itert = cluster_info.nodes_info.find(target);
if (iters == cluster_info.nodes_info.end() || itert == cluster_info.nodes_info.end()) {
return false;
}
node_migration_info node_source = iters->second;
node_migration_info node_target = itert->second;
auto it = node_source.partitions.find(move.source_disk_tag);
if (it == node_source.partitions.end()) {
return false;
}
it->second.erase(move.pid);
node_target.future_partitions.insert(move.pid);
// add into migration list and selected_pid
partition_configuration pc;
pc.pid = move.pid;
pc.primary = primary_addr;
list[move.pid] = generate_balancer_request(*_global_view->apps, pc, move.type, source, target);
_migration_result->emplace(
move.pid, generate_balancer_request(*_global_view->apps, pc, move.type, source, target));
selected_pids.insert(move.pid);
cluster_info.apps_skew[app_id] = get_skew(app_info.replicas_count);
cluster_info.apps_info[app_id] = app_info;
cluster_info.nodes_info[source] = node_source;
cluster_info.nodes_info[target] = node_target;
cluster_info.replicas_count[source]--;
cluster_info.replicas_count[target]++;
return true;
}
} // namespace replication
} // namespace dsn