blob: 0dab076c5f4e21d884477eb50027ac599e5f17ca [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "meta/load_balance_policy.h"
#include <fmt/core.h>
#include <fmt/format.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <limits.h>
#include <nlohmann/json.hpp>
#include <nlohmann/json_fwd.hpp>
#include <algorithm>
#include <iterator>
#include <limits>
#include <mutex>
#include <ostream>
#include "absl/strings/string_view.h"
#include "dsn.layer2_types.h"
#include "meta/meta_data.h"
#include "meta_admin_types.h"
#include "runtime/rpc/dns_resolver.h"
#include "utils/command_manager.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"
#include "utils/strings.h"
DSN_DECLARE_uint64(min_live_node_count_for_unfreeze);
namespace dsn {
namespace replication {
configuration_proposal_action
new_proposal_action(const host_port &target, const host_port &node, config_type::type type)
{
const auto &target_addr = dsn::dns_resolver::instance().resolve_address(target);
const auto &node_addr = dsn::dns_resolver::instance().resolve_address(node);
configuration_proposal_action act;
act.__set_target(target_addr);
act.__set_node(node_addr);
act.__set_hp_target(target);
act.__set_hp_node(node);
act.__set_type(type);
return act;
}
void dump_disk_load(app_id id, const host_port &node, bool only_primary, const disk_load &load)
{
std::ostringstream load_string;
load_string << std::endl << "<<<<<<<<<<" << std::endl;
load_string << "load for " << node << ", "
<< "app id: " << id;
if (only_primary) {
load_string << ", only for primary";
}
load_string << std::endl;
for (const auto &kv : load) {
load_string << kv.first << ": " << kv.second << std::endl;
}
load_string << ">>>>>>>>>>";
LOG_DEBUG("{}", load_string.str());
}
bool calc_disk_load(node_mapper &nodes,
const app_mapper &apps,
app_id id,
const host_port &node,
bool only_primary,
/*out*/ disk_load &load)
{
load.clear();
const node_state *ns = get_node_state(nodes, node, false);
CHECK_NOTNULL(ns, "can't find node({}) from node_state", node);
auto add_one_replica_to_disk_load = [&](const gpid &pid) {
LOG_DEBUG("add gpid({}) to node({}) disk load", pid, node);
const config_context &cc = *get_config_context(apps, pid);
auto iter = cc.find_from_serving(node);
if (iter == cc.serving.end()) {
LOG_WARNING(
"can't collect gpid({})'s info from {}, which should be primary", pid, node);
return false;
} else {
load[iter->disk_tag]++;
return true;
}
};
if (only_primary) {
bool result = ns->for_each_primary(id, add_one_replica_to_disk_load);
dump_disk_load(id, node, only_primary, load);
return result;
} else {
bool result = ns->for_each_partition(id, add_one_replica_to_disk_load);
dump_disk_load(id, node, only_primary, load);
return result;
}
}
std::unordered_map<dsn::host_port, disk_load> get_node_loads(const std::shared_ptr<app_state> &app,
const app_mapper &apps,
node_mapper &nodes,
bool only_primary)
{
std::unordered_map<dsn::host_port, disk_load> node_loads;
for (auto iter = nodes.begin(); iter != nodes.end(); ++iter) {
if (!calc_disk_load(
nodes, apps, app->app_id, iter->first, only_primary, node_loads[iter->first])) {
LOG_WARNING(
"stop the balancer as some replica infos aren't collected, node({}), app({})",
iter->first,
app->get_logname());
return node_loads;
}
}
return node_loads;
}
const std::string &get_disk_tag(const app_mapper &apps, const host_port &node, const gpid &pid)
{
const config_context &cc = *get_config_context(apps, pid);
auto iter = cc.find_from_serving(node);
CHECK(iter != cc.serving.end(), "can't find disk tag of gpid({}) for {}", pid, node);
return iter->disk_tag;
}
std::shared_ptr<configuration_balancer_request>
generate_balancer_request(const app_mapper &apps,
const partition_configuration &pc,
const balance_type &type,
const host_port &from,
const host_port &to)
{
FAIL_POINT_INJECT_F("generate_balancer_request",
[](absl::string_view name) { return nullptr; });
configuration_balancer_request result;
result.gpid = pc.pid;
std::string ans;
switch (type) {
case balance_type::MOVE_PRIMARY:
ans = "move_primary";
result.balance_type = balancer_request_type::move_primary;
result.action_list.emplace_back(
new_proposal_action(from, from, config_type::CT_DOWNGRADE_TO_SECONDARY));
result.action_list.emplace_back(
new_proposal_action(to, to, config_type::CT_UPGRADE_TO_PRIMARY));
break;
case balance_type::COPY_PRIMARY:
ans = "copy_primary";
result.balance_type = balancer_request_type::copy_primary;
result.action_list.emplace_back(
new_proposal_action(from, to, config_type::CT_ADD_SECONDARY_FOR_LB));
result.action_list.emplace_back(
new_proposal_action(from, from, config_type::CT_DOWNGRADE_TO_SECONDARY));
result.action_list.emplace_back(
new_proposal_action(to, to, config_type::CT_UPGRADE_TO_PRIMARY));
result.action_list.emplace_back(new_proposal_action(to, from, config_type::CT_REMOVE));
break;
case balance_type::COPY_SECONDARY:
ans = "copy_secondary";
result.balance_type = balancer_request_type::copy_secondary;
result.action_list.emplace_back(
new_proposal_action(pc.hp_primary, to, config_type::CT_ADD_SECONDARY_FOR_LB));
result.action_list.emplace_back(
new_proposal_action(pc.hp_primary, from, config_type::CT_REMOVE));
break;
default:
CHECK(false, "");
}
LOG_INFO("generate balancer: {} {} from {} of disk_tag({}) to {}",
pc.pid,
ans,
from,
get_disk_tag(apps, from, pc.pid),
to);
return std::make_shared<configuration_balancer_request>(std::move(result));
}
load_balance_policy::load_balance_policy(meta_service *svc)
: _svc(svc), _ctrl_balancer_ignored_apps(nullptr)
{
static std::once_flag flag;
std::call_once(flag, [&]() {
_ctrl_balancer_ignored_apps = dsn::command_manager::instance().register_single_command(
"meta.lb.ignored_app_list",
"Get, set or clear balancer ignored_app_list",
"<get|set|clear> [set_app_id1,set_app_id2,...]",
[this](const std::vector<std::string> &args) {
return remote_command_balancer_ignored_app_ids(args);
});
});
}
load_balance_policy::~load_balance_policy() {}
void load_balance_policy::init(const meta_view *global_view, migration_list *list)
{
_global_view = global_view;
_migration_result = list;
const node_mapper &nodes = *_global_view->nodes;
_alive_nodes = nodes.size();
number_nodes(nodes);
}
bool load_balance_policy::primary_balance(const std::shared_ptr<app_state> &app,
bool only_move_primary)
{
CHECK_GE_MSG(_alive_nodes,
FLAGS_min_live_node_count_for_unfreeze,
"too few alive nodes will lead to freeze");
LOG_INFO("primary balancer for app({}:{})", app->app_name, app->app_id);
auto graph = ford_fulkerson::builder(app, *_global_view->nodes, host_port_id).build();
if (nullptr == graph) {
LOG_DEBUG("the primaries are balanced for app({}:{})", app->app_name, app->app_id);
return true;
}
auto path = graph->find_shortest_path();
if (path != nullptr) {
LOG_DEBUG("{} primaries are flew", path->_flow.back());
return move_primary(std::move(path));
} else {
LOG_INFO("we can't make the server load more balanced by moving primaries to secondaries");
if (!only_move_primary) {
return copy_primary(app, graph->have_less_than_average());
} else {
LOG_INFO("stop to copy primary for app({}) coz it is disabled", app->get_logname());
return true;
}
}
}
bool load_balance_policy::copy_primary(const std::shared_ptr<app_state> &app,
bool still_have_less_than_average)
{
node_mapper &nodes = *(_global_view->nodes);
const app_mapper &apps = *_global_view->apps;
int replicas_low = app->partition_count / _alive_nodes;
auto operation = std::make_unique<copy_primary_operation>(
app, apps, nodes, host_port_vec, host_port_id, still_have_less_than_average, replicas_low);
return operation->start(_migration_result);
}
bool load_balance_policy::move_primary(std::unique_ptr<flow_path> path)
{
// used to calculate the primary disk loads of each server.
// disk_load[disk_tag] means how many primaies on this "disk_tag".
// IF disk_load.find(disk_tag) == disk_load.end(), means 0
disk_load loads[2];
disk_load *prev_load = &loads[0];
disk_load *current_load = &loads[1];
node_mapper &nodes = *(_global_view->nodes);
const app_mapper &apps = *(_global_view->apps);
int current = path->_prev.back();
if (!calc_disk_load(
nodes, apps, path->_app->app_id, host_port_vec[current], true, *current_load)) {
LOG_WARNING("stop move primary as some replica infos aren't collected, node({}), app({})",
host_port_vec[current],
path->_app->get_logname());
return false;
}
int plan_moving = path->_flow.back();
while (path->_prev[current] != 0) {
const auto &from = host_port_vec[path->_prev[current]];
const auto &to = host_port_vec[current];
if (!calc_disk_load(nodes, apps, path->_app->app_id, from, true, *prev_load)) {
LOG_WARNING(
"stop move primary as some replica infos aren't collected, node({}), app({})",
from,
path->_app->get_logname());
return false;
}
start_moving_primary(path->_app, from, to, plan_moving, prev_load, current_load);
current = path->_prev[current];
std::swap(current_load, prev_load);
}
return true;
}
void load_balance_policy::start_moving_primary(const std::shared_ptr<app_state> &app,
const host_port &from,
const host_port &to,
int plan_moving,
disk_load *prev_load,
disk_load *current_load)
{
std::list<dsn::gpid> potential_moving = calc_potential_moving(app, from, to);
auto potential_moving_size = potential_moving.size();
CHECK_LE_MSG(plan_moving,
potential_moving_size,
"from({}) to({}) plan({}), can_move({})",
from,
to,
plan_moving,
potential_moving_size);
while (plan_moving-- > 0) {
dsn::gpid selected = select_moving(potential_moving, prev_load, current_load, from, to);
const partition_configuration &pc = app->partitions[selected.get_partition_index()];
auto balancer_result = _migration_result->emplace(
selected,
generate_balancer_request(
*_global_view->apps, pc, balance_type::MOVE_PRIMARY, from, to));
CHECK(balancer_result.second, "gpid({}) already inserted as an action", selected);
--(*prev_load)[get_disk_tag(*_global_view->apps, from, selected)];
++(*current_load)[get_disk_tag(*_global_view->apps, to, selected)];
}
}
std::list<dsn::gpid> load_balance_policy::calc_potential_moving(
const std::shared_ptr<app_state> &app, const host_port &from, const host_port &to)
{
std::list<dsn::gpid> potential_moving;
const node_state &ns = _global_view->nodes->find(from)->second;
ns.for_each_primary(app->app_id, [&](const gpid &pid) {
const partition_configuration &pc = app->partitions[pid.get_partition_index()];
if (is_secondary(pc, to)) {
potential_moving.push_back(pid);
}
return true;
});
return potential_moving;
}
dsn::gpid load_balance_policy::select_moving(std::list<dsn::gpid> &potential_moving,
disk_load *prev_load,
disk_load *current_load,
const host_port &from,
const host_port &to)
{
std::list<dsn::gpid>::iterator selected = potential_moving.end();
int max = std::numeric_limits<int>::min();
for (auto it = potential_moving.begin(); it != potential_moving.end(); ++it) {
int load_difference = (*prev_load)[get_disk_tag(*_global_view->apps, from, *it)] -
(*current_load)[get_disk_tag(*_global_view->apps, to, *it)];
if (load_difference > max) {
max = load_difference;
selected = it;
}
}
CHECK(selected != potential_moving.end(), "can't find gpid to move from({}) to({})", from, to);
auto res = *selected;
potential_moving.erase(selected);
return res;
}
bool load_balance_policy::execute_balance(
const app_mapper &apps,
bool balance_checker,
bool balance_in_turn,
bool only_move_primary,
const std::function<bool(const std::shared_ptr<app_state> &, bool)> &balance_operation)
{
for (const auto &kv : apps) {
const std::shared_ptr<app_state> &app = kv.second;
if (is_ignored_app(kv.first)) {
LOG_INFO("skip to do balance for the ignored app[{}]", app->get_logname());
continue;
}
if (app->status != app_status::AS_AVAILABLE || app->is_bulk_loading || app->splitting())
continue;
bool enough_information = balance_operation(app, only_move_primary);
if (!enough_information) {
// Even if we don't have enough info for current app,
// the decisions made by previous apps are kept.
// t_migration_result->empty();
return false;
}
if (!balance_checker) {
if (!_migration_result->empty()) {
if (balance_in_turn) {
LOG_INFO("stop to handle more apps after we found some actions for {}",
app->get_logname());
return false;
}
}
}
}
return true;
}
std::string
load_balance_policy::remote_command_balancer_ignored_app_ids(const std::vector<std::string> &args)
{
static const std::string invalid_arguments_message("invalid arguments");
nlohmann::json info;
do {
if (args.empty()) {
break;
}
if (args[0] == "set") {
return set_balancer_ignored_app_ids(args);
} else if (args[0] == "get") {
return get_balancer_ignored_app_ids();
} else if (args[0] == "clear") {
return clear_balancer_ignored_app_ids();
}
} while (false);
info["error"] = invalid_arguments_message;
return info.dump(2);
}
std::string load_balance_policy::set_balancer_ignored_app_ids(const std::vector<std::string> &args)
{
nlohmann::json info;
info["error"] = "invalid argument";
if (args.size() != 2) {
return info.dump(2);
}
std::vector<std::string> app_ids;
dsn::utils::split_args(args[1].c_str(), app_ids, ',');
if (app_ids.empty()) {
return info.dump(2);
}
std::set<app_id> app_list;
for (const auto &app_id_str : app_ids) {
app_id app;
if (!dsn::buf2int32(app_id_str, app)) {
return info.dump(2);
}
app_list.insert(app);
}
{
dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
_balancer_ignored_apps = std::move(app_list);
}
info["error"] = "ok";
return info.dump(2);
}
std::string load_balance_policy::get_balancer_ignored_app_ids()
{
nlohmann::json data;
{
dsn::zauto_read_lock l(_balancer_ignored_apps_lock);
data["ignored_app_id_list"] = fmt::format("{}", fmt::join(_balancer_ignored_apps, ","));
}
return data.dump(2);
}
std::string load_balance_policy::clear_balancer_ignored_app_ids()
{
{
dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
_balancer_ignored_apps.clear();
}
nlohmann::json info;
info["error"] = "ok";
return info.dump(2);
}
bool load_balance_policy::is_ignored_app(app_id app_id)
{
dsn::zauto_read_lock l(_balancer_ignored_apps_lock);
return _balancer_ignored_apps.find(app_id) != _balancer_ignored_apps.end();
}
void load_balance_policy::number_nodes(const node_mapper &nodes)
{
int current_id = 1;
host_port_id.clear();
host_port_vec.resize(_alive_nodes + 2);
for (auto iter = nodes.begin(); iter != nodes.end(); ++iter) {
CHECK(iter->first && iter->second.host_port(), "invalid address");
CHECK(iter->second.alive(), "dead node");
host_port_id[iter->first] = current_id;
host_port_vec[current_id] = iter->first;
++current_id;
}
}
ford_fulkerson::ford_fulkerson(const std::shared_ptr<app_state> &app,
const node_mapper &nodes,
const std::unordered_map<dsn::host_port, int> &host_port_id,
uint32_t higher_count,
uint32_t lower_count,
int replicas_low)
: _app(app),
_nodes(nodes),
_host_port_id(host_port_id),
_higher_count(higher_count),
_lower_count(lower_count),
_replicas_low(replicas_low)
{
make_graph();
}
// using dijstra to find shortest path
std::unique_ptr<flow_path> ford_fulkerson::find_shortest_path()
{
std::vector<bool> visit(_graph_nodes, false);
std::vector<int> flow(_graph_nodes, 0);
std::vector<int> prev(_graph_nodes, -1);
flow[0] = INT_MAX;
while (!visit.back()) {
auto pos = select_node(visit, flow);
if (pos == -1) {
break;
}
update_flow(pos, visit, _network, flow, prev);
}
if (visit.back() && flow.back() != 0) {
return std::make_unique<struct flow_path>(_app, std::move(flow), std::move(prev));
} else {
return nullptr;
}
}
void ford_fulkerson::make_graph()
{
_graph_nodes = _nodes.size() + 2;
_network.resize(_graph_nodes, std::vector<int>(_graph_nodes, 0));
for (const auto &node : _nodes) {
int node_id = _host_port_id.at(node.first);
add_edge(node_id, node.second);
update_decree(node_id, node.second);
}
handle_corner_case();
}
void ford_fulkerson::add_edge(int node_id, const node_state &ns)
{
int primary_count = ns.primary_count(_app->app_id);
if (primary_count > _replicas_low) {
_network[0][node_id] = primary_count - _replicas_low;
} else {
_network[node_id].back() = _replicas_low - primary_count;
}
}
void ford_fulkerson::update_decree(int node_id, const node_state &ns)
{
ns.for_each_primary(_app->app_id, [&, this](const gpid &pid) {
const partition_configuration &pc = _app->partitions[pid.get_partition_index()];
for (const auto &secondary : pc.hp_secondaries) {
auto i = _host_port_id.find(secondary);
CHECK(i != _host_port_id.end(), "invalid secondary address, address = {}", secondary);
_network[node_id][i->second]++;
}
return true;
});
}
void ford_fulkerson::handle_corner_case()
{
// Suppose you have an 8-shard app in a cluster with 3 nodes(which name are node1, node2,
// node3). The distribution of primaries among these nodes is as follow:
// node1 : [0, 1, 2, 3]
// node2 : [4, 5]
// node2 : [6, 7]
// This is obviously unbalanced.
// But if we don't handle this corner case, primary migration will not be triggered
if (_higher_count > 0 && _lower_count == 0) {
for (int i = 0; i != _graph_nodes; ++i) {
if (_network[0][i] > 0)
--_network[0][i];
else
++_network[i][_graph_nodes - 1];
}
}
}
int ford_fulkerson::select_node(std::vector<bool> &visit, const std::vector<int> &flow)
{
auto pos = max_value_pos(visit, flow);
if (pos != -1) {
visit[pos] = true;
}
return pos;
}
int ford_fulkerson::max_value_pos(const std::vector<bool> &visit, const std::vector<int> &flow)
{
int pos = -1, max_value = 0;
for (auto i = 0; i != _graph_nodes; ++i) {
if (!visit[i] && flow[i] > max_value) {
pos = i;
max_value = flow[i];
}
}
return pos;
}
void ford_fulkerson::update_flow(int pos,
const std::vector<bool> &visit,
const std::vector<std::vector<int>> &network,
std::vector<int> &flow,
std::vector<int> &prev)
{
for (auto i = 0; i != _graph_nodes; ++i) {
if (visit[i]) {
continue;
}
auto min = std::min(flow[pos], network[pos][i]);
if (min > flow[i]) {
flow[i] = min;
prev[i] = pos;
}
}
}
copy_replica_operation::copy_replica_operation(
const std::shared_ptr<app_state> app,
const app_mapper &apps,
node_mapper &nodes,
const std::vector<dsn::host_port> &host_port_vec,
const std::unordered_map<dsn::host_port, int> &host_port_id)
: _app(app),
_apps(apps),
_nodes(nodes),
_host_port_vec(host_port_vec),
_host_port_id(host_port_id)
{
}
bool copy_replica_operation::start(migration_list *result)
{
init_ordered_host_port_ids();
_node_loads = get_node_loads(_app, _apps, _nodes, only_copy_primary());
if (_node_loads.size() != _nodes.size()) {
return false;
}
while (true) {
if (!can_continue()) {
break;
}
gpid selected_pid = select_partition(result);
if (selected_pid.get_app_id() != -1) {
copy_once(selected_pid, result);
update_ordered_host_port_ids();
} else {
_ordered_host_port_ids.erase(--_ordered_host_port_ids.end());
}
}
return true;
}
const partition_set *copy_replica_operation::get_all_partitions()
{
int id_max = *_ordered_host_port_ids.rbegin();
const node_state &ns = _nodes.find(_host_port_vec[id_max])->second;
const partition_set *partitions = ns.partitions(_app->app_id, only_copy_primary());
return partitions;
}
gpid copy_replica_operation::select_max_load_gpid(const partition_set *partitions,
migration_list *result)
{
int id_max = *_ordered_host_port_ids.rbegin();
const auto &load_on_max = _node_loads.at(_host_port_vec[id_max]);
gpid selected_pid(-1, -1);
int max_load = -1;
for (const gpid &pid : *partitions) {
if (!can_select(pid, result)) {
continue;
}
const std::string &disk_tag = get_disk_tag(_apps, _host_port_vec[id_max], pid);
auto load = load_on_max.at(disk_tag);
if (load > max_load) {
selected_pid = pid;
max_load = load;
}
}
return selected_pid;
}
void copy_replica_operation::copy_once(gpid selected_pid, migration_list *result)
{
const auto &from = _host_port_vec[*_ordered_host_port_ids.rbegin()];
const auto &to = _host_port_vec[*_ordered_host_port_ids.begin()];
auto pc = _app->partitions[selected_pid.get_partition_index()];
auto request = generate_balancer_request(_apps, pc, get_balance_type(), from, to);
result->emplace(selected_pid, request);
}
void copy_replica_operation::update_ordered_host_port_ids()
{
int id_min = *_ordered_host_port_ids.begin();
int id_max = *_ordered_host_port_ids.rbegin();
--_partition_counts[id_max];
++_partition_counts[id_min];
_ordered_host_port_ids.erase(_ordered_host_port_ids.begin());
_ordered_host_port_ids.erase(--_ordered_host_port_ids.end());
_ordered_host_port_ids.insert(id_max);
_ordered_host_port_ids.insert(id_min);
}
void copy_replica_operation::init_ordered_host_port_ids()
{
_partition_counts.resize(_host_port_vec.size(), 0);
for (const auto &iter : _nodes) {
auto id = _host_port_id.at(iter.first);
_partition_counts[id] = get_partition_count(iter.second);
}
std::set<int, std::function<bool(int left, int right)>> ordered_queue(
[this](int left, int right) {
return _partition_counts[left] != _partition_counts[right]
? _partition_counts[left] < _partition_counts[right]
: left < right;
});
for (const auto &iter : _nodes) {
auto id = _host_port_id.at(iter.first);
ordered_queue.insert(id);
}
_ordered_host_port_ids.swap(ordered_queue);
}
gpid copy_replica_operation::select_partition(migration_list *result)
{
const partition_set *partitions = get_all_partitions();
int id_max = *_ordered_host_port_ids.rbegin();
const node_state &ns = _nodes.find(_host_port_vec[id_max])->second;
CHECK(partitions != nullptr && !partitions->empty(),
"max load({}) shouldn't empty",
ns.host_port());
return select_max_load_gpid(partitions, result);
}
copy_primary_operation::copy_primary_operation(
const std::shared_ptr<app_state> app,
const app_mapper &apps,
node_mapper &nodes,
const std::vector<dsn::host_port> &host_port_vec,
const std::unordered_map<dsn::host_port, int> &host_port_id,
bool have_lower_than_average,
int replicas_low)
: copy_replica_operation(app, apps, nodes, host_port_vec, host_port_id)
{
_have_lower_than_average = have_lower_than_average;
_replicas_low = replicas_low;
}
int copy_primary_operation::get_partition_count(const node_state &ns) const
{
return ns.primary_count(_app->app_id);
}
bool copy_primary_operation::can_select(gpid pid, migration_list *result)
{
return result->find(pid) == result->end();
}
bool copy_primary_operation::can_continue()
{
int id_min = *_ordered_host_port_ids.begin();
if (_have_lower_than_average && _partition_counts[id_min] >= _replicas_low) {
LOG_INFO("{}: stop the copy due to primaries on all nodes will reach low later.",
_app->get_logname());
return false;
}
int id_max = *_ordered_host_port_ids.rbegin();
if (!_have_lower_than_average && _partition_counts[id_max] - _partition_counts[id_min] <= 1) {
LOG_INFO("{}: stop the copy due to the primary will be balanced later.",
_app->get_logname());
return false;
}
return true;
}
enum balance_type copy_primary_operation::get_balance_type() { return balance_type::COPY_PRIMARY; }
} // namespace replication
} // namespace dsn