blob: 474935d89799b9f6614ebc15e47ac9756f94b13b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <gtest/gtest_prod.h>
#include <algorithm>
#include <cstdint>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "common/gpid.h"
#include "load_balance_policy.h"
#include "meta/meta_data.h"
#include "metadata_types.h"
#include "runtime/rpc/rpc_address.h"
namespace dsn {
namespace replication {
class meta_service;
uint32_t get_partition_count(const node_state &ns, balance_type type, int32_t app_id);
uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map);
void get_min_max_set(const std::map<rpc_address, uint32_t> &node_count_map,
/*out*/ std::set<rpc_address> &min_set,
/*out*/ std::set<rpc_address> &max_set);
class cluster_balance_policy : public load_balance_policy
{
public:
cluster_balance_policy(meta_service *svc);
~cluster_balance_policy() = default;
void balance(bool checker, const meta_view *global_view, migration_list *list) override;
private:
struct app_disk_info;
struct app_migration_info;
struct cluster_migration_info;
struct move_info;
struct node_migration_info;
bool cluster_replica_balance(const meta_view *global_view,
const balance_type type,
/*out*/ migration_list &list);
bool do_cluster_replica_balance(const meta_view *global_view,
const balance_type type,
/*out*/ migration_list &list);
bool get_cluster_migration_info(const meta_view *global_view,
const balance_type type,
/*out*/ cluster_migration_info &cluster_info);
bool get_app_migration_info(std::shared_ptr<app_state> app,
const node_mapper &nodes,
const balance_type type,
/*out*/ app_migration_info &info);
void get_node_migration_info(const node_state &ns,
const app_mapper &all_apps,
/*out*/ node_migration_info &info);
bool get_next_move(const cluster_migration_info &cluster_info,
const partition_set &selected_pid,
/*out*/ move_info &next_move);
bool pick_up_move(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const std::set<rpc_address> &min_nodes,
const int32_t app_id,
const partition_set &selected_pid,
/*out*/ move_info &move_info);
void get_max_load_disk_set(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const int32_t app_id,
/*out*/ std::set<app_disk_info> &max_load_disk_set);
std::map<std::string, partition_set> get_disk_partitions_map(
const cluster_migration_info &cluster_info, const rpc_address &addr, const int32_t app_id);
bool pick_up_partition(const cluster_migration_info &cluster_info,
const rpc_address &min_node_addr,
const partition_set &max_load_partitions,
const partition_set &selected_pid,
/*out*/ gpid &picked_pid);
bool apply_move(const move_info &move,
/*out*/ partition_set &selected_pids,
/*out*/ migration_list &list,
/*out*/ cluster_migration_info &cluster_info);
struct app_migration_info
{
int32_t app_id;
std::string app_name;
std::vector<std::map<rpc_address, partition_status::type>> partitions;
std::map<rpc_address, uint32_t> replicas_count;
bool operator<(const app_migration_info &another) const
{
if (app_id < another.app_id)
return true;
return false;
}
bool operator==(const app_migration_info &another) const
{
return app_id == another.app_id;
}
partition_status::type get_partition_status(int32_t pidx, rpc_address addr)
{
for (const auto &kv : partitions[pidx]) {
if (kv.first == addr) {
return kv.second;
}
}
return partition_status::PS_INACTIVE;
}
};
struct node_migration_info
{
rpc_address address;
// key-disk tag, value-partition set
std::map<std::string, partition_set> partitions;
partition_set future_partitions;
bool operator<(const node_migration_info &another) const
{
return address < another.address;
}
bool operator==(const node_migration_info &another) const
{
return address == another.address;
}
};
struct cluster_migration_info
{
balance_type type;
std::map<int32_t, uint32_t> apps_skew;
std::map<int32_t, app_migration_info> apps_info;
std::map<rpc_address, node_migration_info> nodes_info;
std::map<rpc_address, uint32_t> replicas_count;
};
struct app_disk_info
{
int32_t app_id;
rpc_address node;
std::string disk_tag;
partition_set partitions;
bool operator==(const app_disk_info &another) const
{
return app_id == another.app_id && node == another.node && disk_tag == another.disk_tag;
}
bool operator<(const app_disk_info &another) const
{
if (app_id < another.app_id || (app_id == another.app_id && node < another.node) ||
(app_id == another.app_id && node == another.node && disk_tag < another.disk_tag))
return true;
return false;
}
};
struct move_info
{
gpid pid;
rpc_address source_node;
std::string source_disk_tag;
rpc_address target_node;
balance_type type;
};
FRIEND_TEST(cluster_balance_policy, app_migration_info);
FRIEND_TEST(cluster_balance_policy, node_migration_info);
FRIEND_TEST(cluster_balance_policy, get_skew);
FRIEND_TEST(cluster_balance_policy, get_partition_count);
FRIEND_TEST(cluster_balance_policy, get_app_migration_info);
FRIEND_TEST(cluster_balance_policy, get_node_migration_info);
FRIEND_TEST(cluster_balance_policy, get_disk_partitions_map);
FRIEND_TEST(cluster_balance_policy, get_max_load_disk_set);
FRIEND_TEST(cluster_balance_policy, apply_move);
FRIEND_TEST(cluster_balance_policy, pick_up_partition);
FRIEND_TEST(cluster_balance_policy, execute_balance);
};
} // namespace replication
} // namespace dsn