| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #pragma once |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <optional> |
| #include <random> |
| #include <string> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <vector> |
| |
| #include "kudu/rebalance/cluster_status.h" |
| #include "kudu/util/status.h" |
| |
| |
| namespace kudu { |
| namespace rebalance { |
| |
| struct ClusterInfo; |
| struct TableReplicaMove; |
| struct TabletsPlacementInfo; |
| |
| // Sub-set of fields from ClusterResult which are relevant to the rebalancing. |
| struct ClusterRawInfo { |
| std::vector<cluster_summary::ServerHealthSummary> tserver_summaries; |
| std::vector<cluster_summary::TableSummary> table_summaries; |
| std::vector<cluster_summary::TabletSummary> tablet_summaries; |
| std::unordered_set<std::string> tservers_in_maintenance_mode; |
| }; |
| |
| // A class implementing logic for Kudu cluster rebalancing. |
| // A Rebalancer object encapsulates structs with cluster information |
| // and functions needed to calculate and implement replica moves. |
| class Rebalancer { |
| public: |
| // Configuration parameters for the rebalancer aggregated into a struct. |
| struct Config { |
| static constexpr double kLoadImbalanceThreshold = 1.0; |
| |
| // NOLINTNEXTLINE(google-explicit-constructor) |
| Config(std::vector<std::string> ignored_tservers_param = {}, |
| std::vector<std::string> master_addresses = {}, |
| std::vector<std::string> table_filters = {}, |
| size_t max_moves_per_server = 5, |
| size_t max_staleness_interval_sec = 300, |
| int64_t max_run_time_sec = 0, |
| bool move_replicas_from_ignored_tservers = false, |
| bool move_rf1_replicas = false, |
| bool output_replica_distribution_details = false, |
| bool run_policy_fixer = true, |
| bool run_cross_location_rebalancing = true, |
| bool run_intra_location_rebalancing = true, |
| double load_imbalance_threshold = kLoadImbalanceThreshold, |
| bool force_rebalance_replicas_on_maintenance_tservers = false, |
| size_t intra_location_rebalancing_concurrency = 0, |
| bool enable_range_rebalancing = false); |
| |
| // UUIDs of ignored servers. If empty, run the rebalancing on |
| // all tablet servers in the cluster only when all tablet servers |
| // in cluster are healthy. If not empty, specified tablet servers |
| // (including their health state and replicas on them) will be |
| // ignored by the rebalancer tool. |
| std::unordered_set<std::string> ignored_tservers; |
| |
| // Kudu masters' RPC endpoints. |
| std::vector<std::string> master_addresses; |
| |
| // Names of tables to balance. If empty, every table and the whole cluster |
| // will be balanced. |
| std::vector<std::string> table_filters; |
| |
| // Maximum number of move operations to run concurrently on one server. |
| // An 'operation on a server' means a move operation where either source or |
| // destination replica is located on the specified server. |
| size_t max_moves_per_server; |
| |
| // Maximum duration of the 'staleness' interval, when the rebalancer cannot |
| // make any progress in scheduling new moves and no prior scheduled moves |
| // are left, even if re-synchronizing against the cluster's state again and |
| // again. Such a staleness usually happens in case of a persistent problem |
| // with the cluster or when some unexpected concurrent activity is present |
| // (such as automatic recovery of failed replicas, etc.). |
| size_t max_staleness_interval_sec; |
| |
| // Maximum run time, in seconds. |
| int64_t max_run_time_sec; |
| |
| // Whether to move replicas from ignored tservers to other tservers. |
| // If true, replicas on healthy ignored tservers will be moved to other tservers |
| // before running the rebalancing. |
| // If false, ignored tservers and replicas on them will be ignored by the |
| // rebalancer tool. |
| bool move_replicas_from_ignored_tservers; |
| |
| // Whether to move replicas of tablets with replication factor of one. |
| bool move_rf1_replicas; |
| |
| // Whether Rebalancer::PrintStats() should output per-table and per-server |
| // replica distribution details. |
| bool output_replica_distribution_details; |
| |
| // In case of multi-location cluster, whether to detect and fix placement |
| // policy violations. Fixing placement policy violations involves moving |
| // tablet replicas across different locations in the cluster. |
| // This setting is applicable to multi-location clusters only. |
| bool run_policy_fixer; |
| |
| // In case of multi-location cluster, whether to move tablet replicas |
| // between locations in attempt to spread tablet replicas among location |
| // evenly (equalizing loads of locations throughout the cluster). |
| // This setting is applicable to multi-location clusters only. |
| bool run_cross_location_rebalancing; |
| |
| // In case of multi-location cluster, whether to rebalance tablet replica |
| // distribution within each location. |
| // This setting is applicable to multi-location clusters only. |
| bool run_intra_location_rebalancing; |
| |
| // The per-table location load imbalance threshold for the cross-location |
| // balancing algorithm. |
| double load_imbalance_threshold; |
| |
| bool force_rebalance_replicas_on_maintenance_tservers; |
| |
| // The maximum number of intra-location rebalancing sessions that can be run |
| // in parallel. Value of 0 means 'the number of CPU cores at the node'. |
| size_t intra_location_rebalancing_concurrency; |
| |
| // Whether to rebalance ranges of a table. |
| bool enable_range_rebalancing; |
| }; |
| |
| // Represents a concrete move of a replica from one tablet server to another. |
| // Formed logically from a TableReplicaMove by specifying a tablet for the move. |
| struct ReplicaMove { |
| std::string tablet_uuid; |
| std::string ts_uuid_from; |
| std::string ts_uuid_to; |
| std::optional<int64_t> config_opid_idx; // for CAS-enabled Raft changes |
| }; |
| |
| enum class RunStatus { |
| UNKNOWN, |
| CLUSTER_IS_BALANCED, |
| TIMED_OUT, |
| }; |
| |
| // A helper type: key is tablet UUID which corresponds to value.tablet_uuid. |
| typedef std::unordered_map<std::string, ReplicaMove> MovesInProgress; |
| |
| explicit Rebalancer(Config config); |
| |
| // Filter the list of candidate tablets to make sure the location |
| // of the destination server would not contain the majority of replicas |
| // after the move. The 'tablet_ids' is an in-out parameter. |
| static Status FilterCrossLocationTabletCandidates( |
| const std::unordered_map<std::string, std::string>& location_by_ts_id, |
| const TabletsPlacementInfo& placement_info, |
| const TableReplicaMove& move, |
| std::vector<std::string>* tablet_ids); |
| |
| // Given high-level move-some-tablet-replica-for-a-table information from the |
| // rebalancing algorithm, find appropriate tablet replicas to move between the |
| // specified tablet servers. The set of result tablet UUIDs is output |
| // into the 'tablet_ids' container (note: the container is first cleared). |
| // The source and destination replicas can then be determined by the elements |
| // of the 'tablet_ids' container and tablet server UUIDs TableReplicaMove::from |
| // and TableReplica::to correspondingly. If no suitable tablet replicas are found, |
| // 'tablet_ids' will be empty. |
| void FindReplicas(const TableReplicaMove& move, |
| const ClusterRawInfo& raw_info, |
| std::vector<std::string>* tablet_ids); |
| |
| // Convert the 'raw' information about the cluster into information suitable |
| // for the input of the high-level rebalancing algorithm. |
| // The 'moves_in_progress' parameter contains information on the replica moves |
| // which have been scheduled by a caller and still in progress: those are |
| // considered as successfully completed and applied to the 'raw_info' when |
| // building ClusterBalanceInfo for the specified 'raw_info' input. The idea |
| // is to prevent the algorithm outputting the same moves again while some |
| // of the moves recommended at prior steps are still in progress. |
| // The result cluster balance information is output into the 'info' parameter. |
| // The 'info' output parameter cannot be null. |
| Status BuildClusterInfo(const ClusterRawInfo& raw_info, |
| const MovesInProgress& moves_in_progress, |
| ClusterInfo* info) const; |
| |
| struct TabletInfo { |
| std::string tablet_id; |
| std::optional<int64_t> config_idx; // For CAS-like change of Raft configs. |
| }; |
| |
| // Mapping tserver UUID to tablets on it. |
| typedef std::unordered_map<std::string, std::vector<TabletInfo>> TServersToEmptyMap; |
| void GetTServersToEmpty(const ClusterRawInfo& raw_info, |
| std::unordered_set<std::string>* tservers_to_empty) const; |
| static void BuildTServersToEmptyInfo(const ClusterRawInfo& raw_info, |
| const MovesInProgress& moves_in_progress, |
| const std::unordered_set<std::string>& tservers_to_empty, |
| TServersToEmptyMap* tservers_to_empty_map); |
| |
| protected: |
| // Helper class to find and schedule next available rebalancing move operation |
| // and track already scheduled ones. |
| class Runner { |
| public: |
| virtual ~Runner() = default; |
| |
| // Initialize instance of Runner so it can run against Kudu cluster with |
| // the 'master_addresses' RPC endpoints. |
| virtual Status Init(std::vector<std::string> master_addresses) = 0; |
| |
| // Load information on the prescribed replica movement operations. Also, |
| // populate helper containers and other auxiliary run-time structures |
| // used by ScheduleNextMove(). This method is called with every batch |
| // of move operations output by the rebalancing algorithm once previously |
| // loaded moves have been scheduled. |
| virtual void LoadMoves(std::vector<Rebalancer::ReplicaMove> replica_moves) = 0; |
| |
| // Schedule next replica move. Returns 'true' if replica move operation |
| // has been scheduled successfully; otherwise returns 'false' and sets |
| // the 'has_errors' and 'timed_out' parameters accordingly. |
| virtual bool ScheduleNextMove(bool* has_errors, bool* timed_out) = 0; |
| |
| // Update statuses and auxiliary information on in-progress replica move |
| // operations. |
| // The 'timed_out' parameter is set to 'true' if not all in-progress |
| // operations were processed by the deadline specified by the 'deadline_' |
| // member field. |
| // The 'has_errors' parameter is set to 'true' if there were errors while |
| // trying to get the statuses of in-progress operations. |
| // The 'has_pending_moves' parameter is set to 'true' if there were any |
| // in-progress operations and we could check their statuses. |
| // The method returns 'true' if at least one in-progress move operation was |
| // completed (success or failure). |
| virtual bool UpdateMovesInProgressStatus(bool* has_errors, |
| bool* timed_out, |
| bool* has_pending_moves) = 0; |
| |
| virtual Status GetNextMoves(bool* has_moves) = 0; |
| |
| virtual uint32_t moves_count() const = 0; |
| }; // class Runner |
| |
| friend class KsckResultsToClusterBalanceInfoTest; |
| |
| // Filter move operations in 'replica_moves': remove all operations that would |
| // involve moving replicas of tablets which are in 'scheduled_moves'. The |
| // 'replica_moves' cannot be null. |
| static void FilterMoves(const MovesInProgress& scheduled_moves, |
| std::vector<ReplicaMove>* replica_moves); |
| |
| // Configuration for the rebalancer. |
| const Config config_; |
| }; |
| |
| // Hold information about a tablet's replication factor and replicas. |
| struct TabletExtraInfo { |
| int replication_factor; |
| int num_voters; |
| }; |
| |
| // Populate a 'tablet_id' --> 'target tablet replication factor' map. |
| void BuildTabletExtraInfoMap( |
| const ClusterRawInfo& raw_info, |
| std::unordered_map<std::string, TabletExtraInfo>* extra_info_by_tablet_id); |
| |
| // For a given table, find a tablet replica on a specified tserver to move |
| // to another. Given the requested 'move', shuffle the table's tablet_ids |
| // and select the first tablet that doesn't currently have any replicas in |
| // 'tablets_in_move'. Add the tablet's id to 'tablets_in_move' and add |
| // information about this move to 'replica_moves'. If the chosen tablet is |
| // overreplicated, no destination tserver is specified, in case it is better |
| // to just remove it from the replica distribution entirely. |
| Status SelectReplicaToMove( |
| const TableReplicaMove& move, |
| const std::unordered_map<std::string, TabletExtraInfo>& extra_info_by_tablet_id, |
| std::mt19937* random_generator, |
| std::vector<std::string> tablet_ids, |
| std::unordered_set<std::string>* tablets_in_move, |
| std::vector<Rebalancer::ReplicaMove>* replica_moves); |
| |
| } // namespace rebalance |
| } // namespace kudu |