blob: 15cf5bcf25ec39d4ec589bf49f7d90d5dc0419e4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <random>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <boost/optional/optional.hpp>
#include "kudu/rebalance/cluster_status.h"
#include "kudu/util/status.h"
namespace kudu {
namespace rebalance {
struct ClusterInfo;
struct TableReplicaMove;
struct TabletsPlacementInfo;
// Sub-set of fields from ClusterResult which are relevant to the rebalancing.
struct ClusterRawInfo {
std::vector<cluster_summary::ServerHealthSummary> tserver_summaries;
std::vector<cluster_summary::TableSummary> table_summaries;
std::vector<cluster_summary::TabletSummary> tablet_summaries;
std::unordered_set<std::string> tservers_in_maintenance_mode;
// A class implementing logic for Kudu cluster rebalancing.
// A Rebalancer object encapsulates structs with cluster information
// and functions needed to calculate and implement replica moves.
class Rebalancer {
// Configuration parameters for the rebalancer aggregated into a struct.
struct Config {
static constexpr double kLoadImbalanceThreshold = 1.0;
// NOLINTNEXTLINE(google-explicit-constructor)
Config(std::vector<std::string> ignored_tservers_param = {},
std::vector<std::string> master_addresses = {},
std::vector<std::string> table_filters = {},
size_t max_moves_per_server = 5,
size_t max_staleness_interval_sec = 300,
int64_t max_run_time_sec = 0,
bool move_replicas_from_ignored_tservers = false,
bool move_rf1_replicas = false,
bool output_replica_distribution_details = false,
bool run_policy_fixer = true,
bool run_cross_location_rebalancing = true,
bool run_intra_location_rebalancing = true,
double load_imbalance_threshold = kLoadImbalanceThreshold,
bool force_rebalance_replicas_on_maintenance_tservers = false);
// UUIDs of ignored servers. If empty, run the rebalancing on
// all tablet servers in the cluster only when all tablet servers
// in cluster are healthy. If not empty, specified tablet servers
// (including their health state and replicas on them) will be
// ignored by the rebalancer tool.
std::unordered_set<std::string> ignored_tservers;
// Kudu masters' RPC endpoints.
std::vector<std::string> master_addresses;
// Names of tables to balance. If empty, every table and the whole cluster
// will be balanced.
std::vector<std::string> table_filters;
// Maximum number of move operations to run concurrently on one server.
// An 'operation on a server' means a move operation where either source or
// destination replica is located on the specified server.
size_t max_moves_per_server;
// Maximum duration of the 'staleness' interval, when the rebalancer cannot
// make any progress in scheduling new moves and no prior scheduled moves
// are left, even if re-synchronizing against the cluster's state again and
// again. Such a staleness usually happens in case of a persistent problem
// with the cluster or when some unexpected concurrent activity is present
// (such as automatic recovery of failed replicas, etc.).
size_t max_staleness_interval_sec;
// Maximum run time, in seconds.
int64_t max_run_time_sec;
// Whether to move replicas from ignored tservers to other tservers.
// If true, replicas on healthy ignored tservers will be moved to other tservers
// before running the rebalancing.
// If false, ignored tservers and replicas on them will be ignored by the
// rebalancer tool.
bool move_replicas_from_ignored_tservers;
// Whether to move replicas of tablets with replication factor of one.
bool move_rf1_replicas;
// Whether Rebalancer::PrintStats() should output per-table and per-server
// replica distribution details.
bool output_replica_distribution_details;
// In case of multi-location cluster, whether to detect and fix placement
// policy violations. Fixing placement policy violations involves moving
// tablet replicas across different locations in the cluster.
// This setting is applicable to multi-location clusters only.
bool run_policy_fixer;
// In case of multi-location cluster, whether to move tablet replicas
// between locations in attempt to spread tablet replicas among location
// evenly (equalizing loads of locations throughout the cluster).
// This setting is applicable to multi-location clusters only.
bool run_cross_location_rebalancing;
// In case of multi-location cluster, whether to rebalance tablet replica
// distribution within each location.
// This setting is applicable to multi-location clusters only.
bool run_intra_location_rebalancing;
// The per-table location load imbalance threshold for the cross-location
// balancing algorithm.
double load_imbalance_threshold;
bool force_rebalance_replicas_on_maintenance_tservers;
// Represents a concrete move of a replica from one tablet server to another.
// Formed logically from a TableReplicaMove by specifying a tablet for the move.
struct ReplicaMove {
std::string tablet_uuid;
std::string ts_uuid_from;
std::string ts_uuid_to;
boost::optional<int64_t> config_opid_idx; // for CAS-enabled Raft changes
enum class RunStatus {
// A helper type: key is tablet UUID which corresponds to value.tablet_uuid.
typedef std::unordered_map<std::string, ReplicaMove> MovesInProgress;
explicit Rebalancer(Config config);
// Filter the list of candidate tablets to make sure the location
// of the destination server would not contain the majority of replicas
// after the move. The 'tablet_ids' is an in-out parameter.
static Status FilterCrossLocationTabletCandidates(
const std::unordered_map<std::string, std::string>& location_by_ts_id,
const TabletsPlacementInfo& placement_info,
const TableReplicaMove& move,
std::vector<std::string>* tablet_ids);
// Given high-level move-some-tablet-replica-for-a-table information from the
// rebalancing algorithm, find appropriate tablet replicas to move between the
// specified tablet servers. The set of result tablet UUIDs is output
// into the 'tablet_ids' container (note: the container is first cleared).
// The source and destination replicas can then be determined by the elements
// of the 'tablet_ids' container and tablet server UUIDs TableReplicaMove::from
// and TableReplica::to correspondingly. If no suitable tablet replicas are found,
// 'tablet_ids' will be empty.
static void FindReplicas(const TableReplicaMove& move,
const ClusterRawInfo& raw_info,
std::vector<std::string>* tablet_ids);
// Convert the 'raw' information about the cluster into information suitable
// for the input of the high-level rebalancing algorithm.
// The 'moves_in_progress' parameter contains information on the replica moves
// which have been scheduled by a caller and still in progress: those are
// considered as successfully completed and applied to the 'raw_info' when
// building ClusterBalanceInfo for the specified 'raw_info' input. The idea
// is to prevent the algorithm outputting the same moves again while some
// of the moves recommended at prior steps are still in progress.
// The result cluster balance information is output into the 'info' parameter.
// The 'info' output parameter cannot be null.
Status BuildClusterInfo(const ClusterRawInfo& raw_info,
const MovesInProgress& moves_in_progress,
ClusterInfo* info) const;
struct TabletInfo {
std::string tablet_id;
boost::optional<int64_t> config_idx; // For CAS-like change of Raft configs.
// Mapping tserver UUID to tablets on it.
typedef std::unordered_map<std::string, std::vector<TabletInfo>> TServersToEmptyMap;
void GetTServersToEmpty(const ClusterRawInfo& raw_info,
std::unordered_set<std::string>* tservers_to_empty) const;
static void BuildTServersToEmptyInfo(const ClusterRawInfo& raw_info,
const MovesInProgress& moves_in_progress,
const std::unordered_set<std::string>& tservers_to_empty,
TServersToEmptyMap* tservers_to_empty_map);
// Helper class to find and schedule next available rebalancing move operation
// and track already scheduled ones.
class Runner {
virtual ~Runner() = default;
// Initialize instance of Runner so it can run against Kudu cluster with
// the 'master_addresses' RPC endpoints.
virtual Status Init(std::vector<std::string> master_addresses) = 0;
// Load information on the prescribed replica movement operations. Also,
// populate helper containers and other auxiliary run-time structures
// used by ScheduleNextMove(). This method is called with every batch
// of move operations output by the rebalancing algorithm once previously
// loaded moves have been scheduled.
virtual void LoadMoves(std::vector<Rebalancer::ReplicaMove> replica_moves) = 0;
// Schedule next replica move. Returns 'true' if replica move operation
// has been scheduled successfully; otherwise returns 'false' and sets
// the 'has_errors' and 'timed_out' parameters accordingly.
virtual bool ScheduleNextMove(bool* has_errors, bool* timed_out) = 0;
// Update statuses and auxiliary information on in-progress replica move
// operations.
// The 'timed_out' parameter is set to 'true' if not all in-progress
// operations were processed by the deadline specified by the 'deadline_'
// member field.
// The 'has_errors' parameter is set to 'true' if there were errors while
// trying to get the statuses of in-progress operations.
// The 'has_pending_moves' parameter is set to 'true' if there were any
// in-progress operations and we could check their statuses.
// The method returns 'true' if at least one in-progress move operation was
// completed (success or failure).
virtual bool UpdateMovesInProgressStatus(bool* has_errors,
bool* timed_out,
bool* has_pending_moves) = 0;
virtual Status GetNextMoves(bool* has_moves) = 0;
virtual uint32_t moves_count() const = 0;
}; // class Runner
friend class KsckResultsToClusterBalanceInfoTest;
// Filter move operations in 'replica_moves': remove all operations that would
// involve moving replicas of tablets which are in 'scheduled_moves'. The
// 'replica_moves' cannot be null.
static void FilterMoves(const MovesInProgress& scheduled_moves,
std::vector<ReplicaMove>* replica_moves);
// Configuration for the rebalancer.
const Config config_;
// Hold information about a tablet's replication factor and replicas.
struct TabletExtraInfo {
int replication_factor;
int num_voters;
// Populate a 'tablet_id' --> 'target tablet replication factor' map.
void BuildTabletExtraInfoMap(
const ClusterRawInfo& raw_info,
std::unordered_map<std::string, TabletExtraInfo>* extra_info_by_tablet_id);
// For a given table, find a tablet replica on a specified tserver to move
// to another. Given the requested 'move', shuffle the table's tablet_ids
// and select the first tablet that doesn't currently have any replicas in
// 'tablets_in_move'. Add the tablet's id to 'tablets_in_move' and add
// information about this move to 'replica_moves'. If the chosen tablet is
// overreplicated, no destination tserver is specified, in case it is better
// to just remove it from the replica distribution entirely.
Status SelectReplicaToMove(
const TableReplicaMove& move,
const std::unordered_map<std::string, TabletExtraInfo>& extra_info_by_tablet_id,
std::mt19937* random_generator,
std::vector<std::string> tablet_ids,
std::unordered_set<std::string>* tablets_in_move,
std::vector<Rebalancer::ReplicaMove>* replica_moves);
} // namespace rebalance
} // namespace kudu