blob: 69d0d8b9e09e54a7881f7c48e1259f1bc30f6fa3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <condition_variable> // IWYU pragma: keep
#include <cstdint>
#include <ctime>
#include <iosfwd>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <random>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/rebalance/rebalance_algo.h"
#include "kudu/rebalance/rebalancer.h"
#include "kudu/tools/ksck.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h" // IWYU pragma: keep
#include "kudu/util/status.h"
namespace kudu {
namespace client {
class KuduClient;
}
namespace tools {
struct KsckResults;
// A class implementing logic for Kudu cluster rebalancing.
// This class inherits from rebalance::Rebalancer but also
// implements additional functions to print cluster balance
// information.
class RebalancerTool : public rebalance::Rebalancer {
public:
// Create Rebalancer object with the specified configuration.
explicit RebalancerTool(const Config& config);
// Print the stats on the cluster balance information into the 'out' stream.
Status PrintStats(std::ostream& out);
// Run the rebalancing: start the process and return once the balancing
// criteria are satisfied or if an error occurs. The number of attempted
// moves is output into the 'moves_count' parameter (if the parameter is
// not null). The 'result_status' output parameter cannot be null.
Status Run(RunStatus* result_status, size_t* moves_count = nullptr);
private:
// Common base for a few Runner implementations.
class BaseRunner : public Runner {
public:
BaseRunner(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
std::optional<MonoTime> deadline);
Status Init(std::vector<std::string> master_addresses) override;
Status GetNextMoves(bool* has_moves) override;
uint32_t moves_count() const override {
return moves_count_;
}
protected:
// Get next batch of replica moves from the rebalancing algorithm.
// Essentially, it runs ksck against the cluster and feeds the data into the
// rebalancing algorithm along with the information on currently pending
// replica movement operations. The information returned by the high-level
// rebalancing algorithm is translated into particular replica movement
// instructions, which are used to populate the 'replica_moves' parameter
// (the container is cleared first).
virtual Status GetNextMovesImpl(std::vector<Rebalancer::ReplicaMove>* moves) = 0;
// Update the helper containers once a scheduled operation is complete
// (i.e. succeeded or failed).
void UpdateOnMoveCompleted(const std::string& ts_uuid);
// Check if all the tablets servers (excluding those specified in 'ignored_tservers')
// are healthy and available for replica placement.
Status CheckTabletServers(const rebalance::ClusterRawInfo& raw_info);
// A pointer to the Rebalancer object.
RebalancerTool* rebalancer_;
// A set of ignored tablet server UUIDs.
const std::unordered_set<std::string> ignored_tservers_;
// Maximum allowed number of move operations per server. For a move
// operation, a source replica adds +1 at the source server and the target
// replica adds +1 at the destination server.
const size_t max_moves_per_server_;
// Deadline for the activity performed by the Runner class in
// ScheduleNextMoves() and UpdateMovesInProgressStatus() methods.
const std::optional<MonoTime> deadline_;
// Client object to make queries to Kudu masters for various auxiliary info
// while scheduling move operations and monitoring their status.
client::sp::shared_ptr<client::KuduClient> client_;
// Information on scheduled replica movement operations; keys are
// tablet UUIDs, values are ReplicaMove structures.
Rebalancer::MovesInProgress scheduled_moves_;
// Number of successfully completed replica moves operations.
uint32_t moves_count_;
// Kudu cluster RPC end-points.
std::vector<std::string> master_addresses_;
// Mapping 'tserver UUID' --> 'scheduled move operations count'.
std::unordered_map<std::string, int32_t> op_count_per_ts_;
// Mapping 'scheduled move operations count' --> 'tserver UUID'. That's
// just reversed 'op_count_per_ts_'.
std::multimap<int32_t, std::string> ts_per_op_count_;
}; // class BaseRunner
// Runner that leverages RebalancingAlgo interface for rebalancing.
class AlgoBasedRunner : public BaseRunner {
public:
// The 'ignored_tservers' specifies tablet servers that could be
// ignored by rebalancer.
// The 'max_moves_per_server' specifies the maximum number of operations
// per tablet server (both the source and the destination are counted in).
// The 'deadline' specifies the deadline for the run, 'std::nullopt'
// if no timeout is set. If 'location' is std::nullopt, rebalance across
// locations.
AlgoBasedRunner(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
std::optional<MonoTime> deadline);
Status Init(std::vector<std::string> master_addresses) override;
void LoadMoves(std::vector<Rebalancer::ReplicaMove> replica_moves) override;
bool ScheduleNextMove(bool* has_errors, bool* timed_out) override;
bool UpdateMovesInProgressStatus(bool* has_errors,
bool* timed_out,
bool* has_pending_moves) override;
// Get the cluster location the runner is slated to run/running at.
// 'std::nullopt' means all the cluster.
virtual const std::optional<std::string>& location() const = 0;
// Rebalancing algorithm that running uses to find replica moves.
virtual rebalance::RebalancingAlgo* algorithm() = 0;
protected:
Status GetNextMovesImpl(std::vector<Rebalancer::ReplicaMove>* replica_moves) override;
// Using the helper containers src_op_indices_ and dst_op_indices_,
// find the index of the most optimal replica movement operation
// and output the index into the 'op_idx' parameter.
bool FindNextMove(size_t* op_idx);
// Update the helper containers once a move operation has been scheduled.
void UpdateOnMoveScheduled(size_t idx,
const std::string& tablet_uuid,
const std::string& src_ts_uuid,
const std::string& dst_ts_uuid,
bool is_success);
// Auxiliary method used by UpdateOnMoveScheduled() implementation.
void UpdateOnMoveScheduledImpl(
size_t idx,
const std::string& ts_uuid,
bool is_success,
std::unordered_map<std::string, std::set<size_t>>* op_indices);
// The moves to schedule.
std::vector<Rebalancer::ReplicaMove> replica_moves_;
// Mapping 'tserver UUID' --> 'indices of move operations having the
// tserver UUID (i.e. the key) as the source of the move operation'.
std::unordered_map<std::string, std::set<size_t>> src_op_indices_;
// Mapping 'tserver UUID' --> 'indices of move operations having the
// tserver UUID (i.e. the key) as the destination of the move operation'.
std::unordered_map<std::string, std::set<size_t>> dst_op_indices_;
// Random device and generator for selecting among multiple choices, when
// appropriate.
std::random_device random_device_;
std::mt19937 random_generator_;
}; // class AlgoBasedRunner
class IntraLocationRunner : public AlgoBasedRunner {
public:
// The 'ignored_tservers' specifies tablet servers that could be
// ignored by rebalancer.
// The 'max_moves_per_server' specifies the maximum number of operations
// per tablet server (both the source and the destination are counted in).
// The 'deadline' specifies the deadline for the run, 'std::nullopt'
// if no timeout is set. In case of non-location aware cluster or if there
// is just one location defined in the whole cluster, the whole cluster will
// be rebalanced.
IntraLocationRunner(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
std::optional<MonoTime> deadline,
std::string location);
rebalance::RebalancingAlgo* algorithm() override {
return &algorithm_;
}
const std::optional<std::string>& location() const override {
return location_;
}
private:
const std::optional<std::string> location_;
// An instance of the balancing algorithm.
rebalance::TwoDimensionalGreedyAlgo algorithm_;
};
class CrossLocationRunner : public AlgoBasedRunner {
public:
// The 'ignored_tservers' specifies tablet servers that could be
// ignored by rebalancer.
// The 'max_moves_per_server' specifies the maximum number of operations
// per tablet server (both the source and the destination are counted in).
// The 'load_imbalance_threshold' specified the threshold for the
// balancing algorithm used for finding the most optimal replica movements.
// The 'deadline' specifies the deadline for the run, 'std::nullopt'
// if no timeout is set.
CrossLocationRunner(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
double load_imbalance_threshold,
std::optional<MonoTime> deadline);
rebalance::RebalancingAlgo* algorithm() override {
return &algorithm_;
}
const std::optional<std::string>& location() const override {
return location_;
}
private:
const std::optional<std::string> location_ = std::nullopt;
// An instance of the balancing algorithm.
rebalance::LocationBalancingAlgo algorithm_;
};
// Runner that leverages 'SetReplace' method to move replicas.
class ReplaceBasedRunner : public BaseRunner {
public:
// The 'ignored_tservers' specifies tablet servers that could be
// ignored by rebalancer.
// The 'max_moves_per_server' specifies the maximum number of operations
// per tablet server (both the source and the destination are counted in).
// The 'load_imbalance_threshold' specified the threshold for the
// balancing algorithm used for finding the most optimal replica movements.
// The 'deadline' specifies the deadline for the run, 'std::nullopt'
// if no timeout is set.
ReplaceBasedRunner(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
std::optional<MonoTime> deadline);
Status Init(std::vector<std::string> master_addresses) override;
void LoadMoves(std::vector<Rebalancer::ReplicaMove> replica_moves) override;
bool ScheduleNextMove(bool* has_errors, bool* timed_out) override;
bool UpdateMovesInProgressStatus(bool* has_errors,
bool* timed_out,
bool* has_pending_moves) override;
protected:
// Key is tserver UUID which corresponds to value.ts_uuid_from.
typedef std::unordered_multimap<std::string, Rebalancer::ReplicaMove> MovesToSchedule;
Status GetNextMovesImpl(std::vector<Rebalancer::ReplicaMove>* replica_moves) override;
virtual Status GetReplaceMoves(const rebalance::ClusterInfo& ci,
const rebalance::ClusterRawInfo& raw_info,
std::vector<Rebalancer::ReplicaMove>* replica_moves) = 0;
bool FindNextMove(Rebalancer::ReplicaMove* move);
// Update the helper containers once a move operation has been scheduled.
void UpdateOnMoveScheduled(Rebalancer::ReplicaMove move);
// Moves yet to schedule.
MovesToSchedule moves_to_schedule_;
};
class PolicyFixer : public ReplaceBasedRunner {
public:
PolicyFixer(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
std::optional<MonoTime> deadline);
private:
// Get replica moves to restore the placement policy restrictions.
// If returns Status::OK() with replica_moves empty, the distribution
// of tablet relicas is considered conform the main constraint of the
// placement policy.
Status GetReplaceMoves(const rebalance::ClusterInfo& ci,
const rebalance::ClusterRawInfo& raw_info,
std::vector<Rebalancer::ReplicaMove>* replica_moves) override;
};
class IgnoredTserversRunner : public ReplaceBasedRunner {
public:
IgnoredTserversRunner(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
std::optional<MonoTime> deadline);
private:
// Key is tserver UUID which corresponds to value.ts_uuid_from.
typedef std::unordered_multimap<std::string, Rebalancer::ReplicaMove> MovesToSchedule;
// Get replica moves to move replicas from healthy ignored tservers.
// If returns Status::OK() with replica_moves empty, there would be
// no replica on the healthy ignored tservers.
Status GetReplaceMoves(const rebalance::ClusterInfo& ci,
const rebalance::ClusterRawInfo& raw_info,
std::vector<Rebalancer::ReplicaMove>* replica_moves) override;
// Return Status::OK() if it's safe to move all replicas from the ignored to other servers
// and all tservers that need to empty are in maintenance mode.
static Status CheckIgnoredTServers(const rebalance::ClusterRawInfo& raw_info,
const rebalance::ClusterInfo& ci,
const std::unordered_set<std::string>& tservers_to_empty);
void GetMovesFromIgnoredTservers(const TServersToEmptyMap& ignored_tservers_info,
std::vector<Rebalancer::ReplicaMove>* replica_moves);
// Random device and generator for selecting among multiple choices, when appropriate.
std::random_device random_device_;
std::mt19937 random_generator_;
};
// Convert ksck results into information relevant to rebalancing the cluster
// at the location specified by 'location' parameter ('std::nullopt' for
// 'location' means that's about cross-location rebalancing). Basically,
// 'raw' information is just a sub-set of relevant fields of the KsckResults
// structure filtered to contain information only for the specified location.
Status KsckResultsToClusterRawInfo(const std::optional<std::string>& location,
const KsckResults& ksck_info,
rebalance::ClusterRawInfo* raw_info);
// Print replica count infomation about tservers need to empty.
Status PrintIgnoredTserversStats(const rebalance::ClusterRawInfo& raw_info,
std::ostream& out) const;
// Print information on the cross-location balance.
Status PrintCrossLocationBalanceStats(const rebalance::ClusterInfo& ci,
std::ostream& out) const;
// Print statistics for the specified location. If 'location' is an empty
// string, that's about printing the cluster-wide stats for a cluster that
// doesn't have any locations defined.
Status PrintLocationBalanceStats(const std::string& location,
const rebalance::ClusterRawInfo& raw_info,
const rebalance::ClusterInfo& ci,
std::ostream& out) const;
Status PrintPolicyViolationInfo(const rebalance::ClusterRawInfo& raw_info,
std::ostream& out) const;
// Run rebalancing using the specified runner.
Status RunWith(Runner* runner, RunStatus* result_status);
// Refresh the information on the cluster for the specified location
// (involves running ksck).
Status GetClusterRawInfo(const std::optional<std::string>& location,
rebalance::ClusterRawInfo* raw_info);
// Reset ksck-related fields and run ksck against the cluster.
Status RefreshKsckResults();
// Auxiliary Ksck object to get information on the cluster.
std::unique_ptr<Ksck> ksck_; // protected by ksck_lock_
rw_spinlock ksck_lock_;
bool ksck_refreshing_{false}; // protected by ksck_refresh_lock_
Status ksck_refresh_status_; // protected by ksck_refresh_lock_
std::mutex ksck_refresh_lock_;
std::condition_variable ksck_refresh_cv_;
};
} // namespace tools
} // namespace kudu