blob: 6d2c6091f64a0c080af0472d0f75d2088d209b00 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/master/auto_leader_rebalancer.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <map>
#include <optional>
#include <ostream>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/ts_descriptor.h"
#include "kudu/master/ts_manager.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/security/init.h"
#include "kudu/util/cow_object.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
using kudu::consensus::ConsensusServiceProxy;
using kudu::consensus::LeaderStepDownMode;
using kudu::consensus::LeaderStepDownRequestPB;
using kudu::consensus::LeaderStepDownResponsePB;
using kudu::consensus::RaftPeerPB;
using kudu::rpc::MessengerBuilder;
using kudu::rpc::RpcController;
using std::map;
using std::nullopt;
using std::string;
using std::unordered_set;
using std::vector;
using strings::Substitute;
DEFINE_uint32(auto_leader_rebalancing_rpc_timeout_seconds, 10,
"auto leader rebalancing send leader step down rpc timeout seconds");
TAG_FLAG(auto_leader_rebalancing_rpc_timeout_seconds, advanced);
TAG_FLAG(auto_leader_rebalancing_rpc_timeout_seconds, runtime);
DEFINE_uint32(auto_leader_rebalancing_interval_seconds, 3600,
"How long to sleep in between auto leader rebalancing cycles, before checking "
"the cluster again to see if there is leader skew and if run task again.");
TAG_FLAG(auto_leader_rebalancing_interval_seconds, advanced);
TAG_FLAG(auto_leader_rebalancing_interval_seconds, runtime);
DEFINE_uint32(leader_rebalancing_max_moves_per_round, 10,
"Max count of leader transfer when every leader rebalance runs");
TAG_FLAG(leader_rebalancing_max_moves_per_round, advanced);
TAG_FLAG(leader_rebalancing_max_moves_per_round, runtime);
DEFINE_bool(leader_rebalancing_ignore_soft_deleted_tables, false,
"Whether to ignore rebalancing the soft deleted tables");
TAG_FLAG(leader_rebalancing_ignore_soft_deleted_tables, advanced);
TAG_FLAG(leader_rebalancing_ignore_soft_deleted_tables, runtime);
DECLARE_bool(auto_leader_rebalancing_enabled);
namespace kudu {
namespace master {
AutoLeaderRebalancerTask::AutoLeaderRebalancerTask(CatalogManager* catalog_manager,
TSManager* ts_manager)
: catalog_manager_(catalog_manager),
ts_manager_(ts_manager),
shutdown_(1),
random_generator_(random_device_()),
number_of_loop_iterations_for_test_(0),
moves_scheduled_this_round_for_test_(0) {}
AutoLeaderRebalancerTask::~AutoLeaderRebalancerTask() {
if (thread_) {
Shutdown();
}
}
Status AutoLeaderRebalancerTask::Init() {
DCHECK(!thread_) << "AutoleaderRebalancerTask is already initialized";
MessengerBuilder builder("auto-leader-rebalancer");
if (auto username = kudu::security::GetLoggedInUsernameFromKeytab()) {
builder.set_sasl_proto_name(username.value());
}
RETURN_NOT_OK(std::move(builder).Build(&messenger_));
return Thread::Create("catalog manager", "auto-leader-rebalancer",
[this]() { this->RunLoop(); }, &thread_);
}
void AutoLeaderRebalancerTask::Shutdown() {
CHECK(thread_) << "AutoLeaderRebalancerTask is not initialized";
if (!shutdown_.CountDown()) {
return;
}
CHECK_OK(ThreadJoiner(thread_.get()).Join());
thread_.reset();
}
Status AutoLeaderRebalancerTask::RunLeaderRebalanceForTable(
const scoped_refptr<TableInfo>& table_info,
const vector<string>& tserver_uuids,
const unordered_set<string>& exclude_dest_uuids,
AutoLeaderRebalancerTask::ExecuteMode mode) {
LOG(INFO) << Substitute("leader rebalance for table $0", table_info->table_name());
TableMetadataLock table_l(table_info.get(), LockMode::READ);
const SysTablesEntryPB& table_data = table_info->metadata().state().pb;
int replication_factor = table_data.num_replicas();
DCHECK_GT(replication_factor, 0);
if (table_data.state() == SysTablesEntryPB::REMOVED || replication_factor == 1) {
// Don't worry about rebalancing replicas that belong to deleted tables.
return Status::OK();
}
// tablet_id -> leader‘s tserver uuid
map<string, string> leader_ts_uuid_by_tablet_id;
// tablet_id -> followers' tserver uuids
map<string, vector<string>> follower_ts_uuids_by_tablet_id;
// tserver uuid -> leaders' replicas
map<string, vector<string>> leader_tablet_ids_by_ts_uuid;
// tserver uuid -> all replicas
map<string, vector<string>> tablet_ids_by_ts_uuid;
map<string, HostPort> host_port_by_leader_ts_uuid;
vector<scoped_refptr<TabletInfo>> tablet_infos;
table_info->GetAllTablets(&tablet_infos);
// step 1. Get basic statistics
for (const auto& tablet : tablet_infos) {
TabletMetadataLock tablet_l(tablet.get(), LockMode::READ);
// Retrieve all replicas of the tablet.
TabletLocationsPB locs_pb;
CatalogManager::TSInfosDict ts_infos_dict;
{
CatalogManager::ScopedLeaderSharedLock leaderlock(catalog_manager_);
RETURN_NOT_OK(leaderlock.first_failed_status());
// This will only return tablet replicas in the RUNNING state, and
// filter to only retrieve voter replicas.
RETURN_NOT_OK(catalog_manager_->GetTabletLocations(
tablet->id(),
ReplicaTypeFilter::VOTER_REPLICA,
/*use_external_addr=*/false,
&locs_pb,
&ts_infos_dict,
nullopt));
}
// Build a summary for each replica of the tablet.
for (const auto& r : locs_pb.interned_replicas()) {
int index = r.ts_info_idx();
const TSInfoPB& ts_info = *(ts_infos_dict.ts_info_pbs()[index]);
string uuid = ts_info.permanent_uuid();
if (r.role() == RaftPeerPB::LEADER) {
auto& leader_uuids = LookupOrInsert(&leader_tablet_ids_by_ts_uuid, uuid, {});
leader_uuids.emplace_back(tablet->id());
InsertOrDie(&leader_ts_uuid_by_tablet_id, tablet->id(), uuid);
InsertIfNotPresent(
&host_port_by_leader_ts_uuid, uuid, HostPortFromPB(ts_info.rpc_addresses(0)));
} else if (r.role() == RaftPeerPB::FOLLOWER) {
auto& follower_uuids = LookupOrInsert(&follower_ts_uuids_by_tablet_id, tablet->id(), {});
follower_uuids.emplace_back(uuid);
} else {
LOG(WARNING) << Substitute("table_id $0, permanent_uuid $1, not a VOTER, role: $2",
tablet->id(),
uuid,
RaftPeerPB::Role_Name(r.role()));
continue;
}
auto& uuid_replicas = LookupOrInsert(&tablet_ids_by_ts_uuid, ts_info.permanent_uuid(), {});
uuid_replicas.emplace_back(tablet->id());
}
}
// step 2.
// pick the servers which number of leaders greater than 1/3 of number of all replicas
// <uuid, number of replica, number of leader>
map<string, std::pair<int32_t, int32_t>> replica_and_leader_count_by_ts_uuid;
// uuid->leader should transfer count
map<string, int32_t> leader_transfer_source;
size_t remaining_tablets = tablet_infos.size();
size_t remaining_tservers = tserver_uuids.size();
for (const auto& uuid : tserver_uuids) {
auto* tablet_ids_ptr = FindOrNull(tablet_ids_by_ts_uuid, uuid);
int32_t replica_count = tablet_ids_ptr ? tablet_ids_ptr->size() : 0;
if (replica_count == 0) {
// means no replicas (and no leaders), maybe a tserver joined kudu cluster just now, skip it
remaining_tservers--;
continue;
}
auto* leader_tablet_ids_ptr = FindOrNull(leader_tablet_ids_by_ts_uuid, uuid);
int32_t leader_count = leader_tablet_ids_ptr ? leader_tablet_ids_ptr->size() : 0;
replica_and_leader_count_by_ts_uuid.insert(
{uuid, std::pair<int32_t, int32_t>(replica_count, leader_count)});
VLOG(1) << Substitute(
"uuid: $0, replica_count: $1, leader_count: $2", uuid, replica_count, leader_count);
// If the number of remaining tablets is divisible by the number of remaining tablet
// servers, the leader num of all the remaining tablet servers should be the division
// result.
// Else, the maximum number of leader replicas per tablet server should be the ceil value
// of the average leaders num. Transfer the excess leaders if a tablet server's
// leader num is more than that.
const uint32_t target_leader_count =
(remaining_tablets + remaining_tservers - 1) / remaining_tservers;
int32_t should_transfer_count = leader_count - static_cast<int32_t>(target_leader_count);
if (should_transfer_count > 0) {
leader_transfer_source.insert({uuid, should_transfer_count});
VLOG(1) << Substitute("$0 should transfer leader count: $1", uuid, should_transfer_count);
}
if (remaining_tablets % remaining_tservers == 0) {
remaining_tablets -= target_leader_count;
} else {
remaining_tablets -= (should_transfer_count >= 0 ? target_leader_count :
(target_leader_count - 1));
}
remaining_tservers--;
}
// Step 3.
// Generate transfer task, <tablet_id, from_uuid, to_uuid>
map<string, std::pair<string, string>> leader_transfer_tasks;
for (const auto& from_info : leader_transfer_source) {
string leader_uuid = from_info.first;
int32_t need_transfer_count = from_info.second;
int32_t pick_count = 0;
vector<string>& uuid_leaders = leader_tablet_ids_by_ts_uuid[leader_uuid];
std::shuffle(uuid_leaders.begin(), uuid_leaders.end(), random_generator_);
// This loop would generate 'uuid_leaders.size()' leader transferring tasks at most.
// Every task would pick a dest uuid to transfer leader.
for (int i = 0; i < uuid_leaders.size(); i++) {
const string& tablet_id = uuid_leaders[i];
vector<string> uuid_followers = follower_ts_uuids_by_tablet_id[tablet_id];
// TabletId leader transfer, pick a dest follower
string dest_follower_uuid;
if (uuid_followers.size() + 1 < replication_factor) {
continue;
}
double min_score = 1;
for (int j = 0; j < uuid_followers.size(); j++) {
if (ContainsKey(exclude_dest_uuids, uuid_followers[j])) {
continue;
}
std::pair<int32_t, int32_t>& replica_and_leader_count =
replica_and_leader_count_by_ts_uuid[uuid_followers[j]];
int32_t replica_count = replica_and_leader_count.first;
if (replica_count <= 0) {
dest_follower_uuid.clear();
break;
}
int32_t leader_count = replica_and_leader_count.second;
// double is not precise.
double score = static_cast<double>(leader_count) / replica_count;
if (score < min_score) {
min_score = score;
dest_follower_uuid = uuid_followers[j];
}
}
if (dest_follower_uuid.empty()) {
continue;
}
std::pair<int32_t, int32_t>& replica_and_leader_count =
replica_and_leader_count_by_ts_uuid[leader_uuid];
int32_t replica_count = replica_and_leader_count.first;
int32_t leader_count = replica_and_leader_count.second;
double leader_score = static_cast<double>(leader_count) / replica_count;
if (min_score > leader_score) {
// Skip it, because the transfer will cause more leader skew
continue;
}
leader_transfer_tasks.insert(
{tablet_id, std::pair<string, string>(leader_uuid, dest_follower_uuid)});
replica_and_leader_count_by_ts_uuid[leader_uuid].second--;
replica_and_leader_count_by_ts_uuid[dest_follower_uuid].second++;
if (leader_transfer_tasks.size() >= FLAGS_leader_rebalancing_max_moves_per_round) {
break;
}
if (++pick_count == need_transfer_count) {
// Have picked enough leader transfer tasks for this tserver.
break;
}
}
if (leader_transfer_tasks.size() >= FLAGS_leader_rebalancing_max_moves_per_round) {
VLOG(1) << Substitute(
"leader rebalance reach the upper limit: $0, try do left leader transfer tasks next "
"time", FLAGS_leader_rebalancing_max_moves_per_round);
}
}
if (mode == AutoLeaderRebalancerTask::ExecuteMode::TEST) {
if (!leader_transfer_tasks.empty()) {
return Status::IllegalState(Substitute("leader_transfer_task size should be 0, but $0",
leader_transfer_tasks.size()));
}
return Status::OK();
}
moves_scheduled_this_round_for_test_ = leader_transfer_tasks.size();
VLOG(1) << Substitute("leader rebalance tasks, size: $0, leader_transfer_source, size: $1",
moves_scheduled_this_round_for_test_.load(),
leader_transfer_source.size());
// Step 4. Do Leader transfer tasks.
// @TODO(duyuqi), optimal speed
// If leader rebalancing tasks is too many, each rpc of the thread wait the response
// synchronously, which may be very slow.
int leader_transfer_count = 0;
for (const auto& task : leader_transfer_tasks) {
const string& leader_uuid = task.second.first;
LeaderStepDownRequestPB request;
request.set_dest_uuid(task.second.first);
request.set_tablet_id(task.first);
request.set_mode(LeaderStepDownMode::GRACEFUL);
request.set_new_leader_uuid(task.second.second);
LeaderStepDownResponsePB response;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(FLAGS_auto_leader_rebalancing_rpc_timeout_seconds));
auto* host_port = FindOrNull(host_port_by_leader_ts_uuid, leader_uuid);
if (!host_port) {
continue;
}
std::shared_ptr<TSDescriptor> leader_desc;
if (!ts_manager_->LookupTSByUUID(leader_uuid, &leader_desc)) {
continue;
}
if (PREDICT_FALSE(TServerStatePB::MAINTENANCE_MODE ==
ts_manager_->GetTServerState(task.second.second))) {
continue;
}
vector<Sockaddr> resolved;
RETURN_NOT_OK(host_port->ResolveAddresses(&resolved));
ConsensusServiceProxy proxy(messenger_, resolved[0], host_port->host());
RETURN_NOT_OK(proxy.LeaderStepDown(request, &response, &rpc));
leader_transfer_count++;
if (!response.has_error()) {
VLOG(1) << Substitute("leader transfer table: $0, tablet_id: $1, from: $2 to: $3",
table_data.name(),
task.first,
leader_uuid,
task.second.second);
}
}
// @TODO(duyuqi)
// Add metrics to replace the log.
VLOG(0) << Substitute("table: $0, leader rebalance finish, leader transfer count: $1",
table_data.name(),
leader_transfer_count);
return Status::OK();
}
Status AutoLeaderRebalancerTask::RunLeaderRebalancer() {
std::lock_guard guard(running_mutex_);
// If catalog manager isn't initialized or isn't the leader, don't do leader
// rebalancing. Putting the auto-rebalancer to sleep shouldn't affect the
// master's ability to become the leader. When the thread wakes up and
// discovers it is now the leader, then it can begin auto-rebalancing.
{
CatalogManager::ScopedLeaderSharedLock l(catalog_manager_);
if (!l.first_failed_status().ok()) {
moves_scheduled_this_round_for_test_ = 0;
return Status::OK();
}
}
number_of_loop_iterations_for_test_++;
// Leader balance need not disk capacity, so
// we get all tserver uuids
TSDescriptorVector descriptors;
ts_manager_->GetAllDescriptors(&descriptors);
if (PREDICT_FALSE(descriptors.empty())) {
VLOG(1) << "No tserver registered for now, skipping this leader rebalancing round.";
return Status::OK();
}
vector<string> tserver_uuids;
for (const auto& e : descriptors) {
if (e->PresumedDead()) {
continue;
}
tserver_uuids.emplace_back(e->permanent_uuid());
}
// Avoid transferring leaders to tservers that are in MAINTENANCE_MODE.
auto tserver_state_by_uuid = ts_manager_->GetTServerStates();
unordered_set<string> exclude_dest_uuids;
for (const auto& uuid_state : tserver_state_by_uuid) {
if (uuid_state.second.first == TServerStatePB::MAINTENANCE_MODE) {
exclude_dest_uuids.insert(uuid_state.first);
}
}
vector<scoped_refptr<TableInfo>> table_infos;
{
CatalogManager::ScopedLeaderSharedLock leader_lock(catalog_manager_);
RETURN_NOT_OK(leader_lock.first_failed_status());
if (FLAGS_leader_rebalancing_ignore_soft_deleted_tables) {
catalog_manager_->GetNormalizedTables(&table_infos);
} else {
catalog_manager_->GetAllTables(&table_infos);
}
}
for (const auto& table_info : table_infos) {
RunLeaderRebalanceForTable(table_info, tserver_uuids, exclude_dest_uuids);
}
// @TODO(duyuqi)
// Enrich the log and add metrics for leader rebalancer.
LOG(INFO) << "All tables' leader rebalancing finished this round";
return Status::OK();
}
void AutoLeaderRebalancerTask::RunLoop() {
while (
!shutdown_.WaitFor(MonoDelta::FromSeconds(FLAGS_auto_leader_rebalancing_interval_seconds))) {
if (FLAGS_auto_leader_rebalancing_enabled) {
WARN_NOT_OK(RunLeaderRebalancer(),
Substitute("the master instance isn't leader"));
}
}
}
} // namespace master
} // namespace kudu