blob: 68552daa9f7761b7e053d66a9506e109e60ac477 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/master/master.h"
#include <algorithm>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/cfile/block_cache.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/location_cache.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
#include "kudu/master/master_cert_authority.h"
#include "kudu/master/master_path_handlers.h"
#include "kudu/master/master_service.h"
#include "kudu/master/ts_manager.h"
#include "kudu/master/txn_manager.h"
#include "kudu/master/txn_manager_service.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/service_if.h"
#include "kudu/security/token_signer.h"
#include "kudu/server/rpc_server.h"
#include "kudu/server/webserver.h"
#include "kudu/tserver/tablet_copy_service.h"
#include "kudu/tserver/tablet_service.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/maintenance_manager.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/status.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/version_info.h"
DEFINE_int32(master_registration_rpc_timeout_ms, 1500,
"Timeout for retrieving master registration over RPC.");
TAG_FLAG(master_registration_rpc_timeout_ms, experimental);
DEFINE_int64(tsk_rotation_seconds, 60 * 60 * 24 * 1,
"Number of seconds between consecutive activations of newly "
"generated TSKs (Token Signing Keys).");
TAG_FLAG(tsk_rotation_seconds, advanced);
TAG_FLAG(tsk_rotation_seconds, experimental);
DEFINE_int64(authn_token_validity_seconds, 60 * 60 * 24 * 7,
"Period of time for which an issued authentication token is valid. "
"Clients will automatically attempt to reacquire a token after the "
"validity period expires.");
TAG_FLAG(authn_token_validity_seconds, experimental);
DEFINE_int64(authz_token_validity_seconds, 60 * 5,
"Period of time for which an issued authorization token is valid. "
"Clients will automatically attempt to reacquire a token after the "
"validity period expires.");
TAG_FLAG(authz_token_validity_seconds, experimental);
DEFINE_string(location_mapping_cmd, "",
"A Unix command which takes a single argument, the IP address or "
"hostname of a tablet server or client, and returns the location "
"string for the tablet server. A location string begins with a / "
"and consists of /-separated tokens each of which contains only "
"characters from the set [a-zA-Z0-9_-.]. If the cluster is not "
"using location awareness features this flag should not be set.");
DECLARE_bool(txn_manager_lazily_initialized);
DECLARE_bool(txn_manager_enabled);
using kudu::consensus::RaftPeerPB;
using kudu::fs::ErrorHandlerType;
using kudu::rpc::ServiceIf;
using kudu::security::TokenSigner;
using kudu::transactions::TxnManager;
using kudu::transactions::TxnManagerServiceImpl;
using kudu::tserver::ConsensusServiceImpl;
using kudu::tserver::TabletCopyServiceImpl;
using std::min;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace rpc {
class RpcContext;
} // namespace rpc
} // namespace kudu
namespace kudu {
namespace master {
namespace {
constexpr const char* kReplaceMasterMessage =
"this master may return incorrect results and should be replaced";
void CrashMasterOnDiskError(const string& uuid) {
LOG(FATAL) << Substitute("Disk error detected on data directory $0: $1",
uuid, kReplaceMasterMessage);
}
void CrashMasterOnCFileCorruption(const string& tablet_id) {
LOG(FATAL) << Substitute("CFile corruption detected on system catalog $0: $1",
tablet_id, kReplaceMasterMessage);
}
void CrashMasterOnKudu2233Corruption(const string& tablet_id) {
LOG(FATAL) << Substitute("KUDU-2233 corruption detected on system catalog $0: $1 ",
tablet_id, kReplaceMasterMessage);
}
// TODO(Alex Feinberg) this method should be moved to a separate class (along with
// ListMasters), so that it can also be used in TS and client when
// bootstrapping.
Status GetMasterEntryForHost(const shared_ptr<rpc::Messenger>& messenger,
const HostPort& hostport,
ServerEntryPB* e) {
Sockaddr sockaddr;
RETURN_NOT_OK(SockaddrFromHostPort(hostport, &sockaddr));
MasterServiceProxy proxy(messenger, sockaddr, hostport.host());
GetMasterRegistrationRequestPB req;
GetMasterRegistrationResponsePB resp;
rpc::RpcController controller;
controller.set_timeout(MonoDelta::FromMilliseconds(FLAGS_master_registration_rpc_timeout_ms));
RETURN_NOT_OK(proxy.GetMasterRegistration(req, &resp, &controller));
e->mutable_instance_id()->CopyFrom(resp.instance_id());
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
e->mutable_registration()->CopyFrom(resp.registration());
e->set_role(resp.role());
if (resp.has_cluster_id()) {
e->set_cluster_id(resp.cluster_id());
}
if (resp.has_member_type()) {
e->set_member_type(resp.member_type());
}
return Status::OK();
}
} // anonymous namespace
Master::Master(const MasterOptions& opts)
: KuduServer("Master", opts, "kudu.master"),
state_(kStopped),
catalog_manager_(new CatalogManager(this)),
txn_manager_(FLAGS_txn_manager_enabled ? new TxnManager(this) : nullptr),
path_handlers_(new MasterPathHandlers(this)),
opts_(opts),
registration_initialized_(false) {
const auto& location_cmd = FLAGS_location_mapping_cmd;
if (!location_cmd.empty()) {
location_cache_.reset(new LocationCache(location_cmd, metric_entity_.get()));
}
ts_manager_.reset(new TSManager(location_cache_.get(), metric_entity_));
}
Master::~Master() {
ShutdownImpl();
}
Status Master::Init() {
CHECK_EQ(kStopped, state_);
cfile::BlockCache::GetSingleton()->StartInstrumentation(metric_entity());
RETURN_NOT_OK(ThreadPoolBuilder("init").set_max_threads(1).Build(&init_pool_));
RETURN_NOT_OK(KuduServer::Init());
if (web_server_) {
RETURN_NOT_OK(path_handlers_->Register(web_server_.get()));
}
maintenance_manager_.reset(new MaintenanceManager(
MaintenanceManager::kDefaultOptions,
fs_manager_->uuid(),
metric_entity()));
// The certificate authority object is initialized upon loading
// CA private key and certificate from the system table when the server
// becomes a leader.
cert_authority_.reset(new MasterCertAuthority(fs_manager_->uuid()));
// The TokenSigner loads its keys during catalog manager initialization.
token_signer_.reset(new TokenSigner(
FLAGS_authn_token_validity_seconds,
FLAGS_authz_token_validity_seconds,
FLAGS_tsk_rotation_seconds,
messenger_->shared_token_verifier()));
state_ = kInitialized;
return Status::OK();
}
Status Master::Start() {
RETURN_NOT_OK(StartAsync());
RETURN_NOT_OK(WaitForCatalogManagerInit());
google::FlushLogFiles(google::INFO); // Flush the startup messages.
return Status::OK();
}
Status Master::StartAsync() {
CHECK_EQ(kInitialized, state_);
fs_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
&CrashMasterOnDiskError);
fs_manager_->SetErrorNotificationCb(ErrorHandlerType::CFILE_CORRUPTION,
&CrashMasterOnCFileCorruption);
fs_manager_->SetErrorNotificationCb(ErrorHandlerType::KUDU_2233_CORRUPTION,
&CrashMasterOnKudu2233Corruption);
RETURN_NOT_OK(maintenance_manager_->Start());
unique_ptr<ServiceIf> impl(new MasterServiceImpl(this));
unique_ptr<ServiceIf> consensus_service(
new ConsensusServiceImpl(this, catalog_manager_.get()));
unique_ptr<ServiceIf> tablet_copy_service(
new TabletCopyServiceImpl(this, catalog_manager_.get()));
unique_ptr<ServiceIf> txn_manager_service(
txn_manager_ ? new TxnManagerServiceImpl(this) : nullptr);
RETURN_NOT_OK(RegisterService(std::move(impl)));
RETURN_NOT_OK(RegisterService(std::move(consensus_service)));
RETURN_NOT_OK(RegisterService(std::move(tablet_copy_service)));
if (txn_manager_service) {
RETURN_NOT_OK(RegisterService(std::move(txn_manager_service)));
}
RETURN_NOT_OK(KuduServer::Start());
// Now that we've bound, construct our ServerRegistrationPB.
RETURN_NOT_OK(InitMasterRegistration());
// Start initializing the catalog manager.
RETURN_NOT_OK(init_pool_->Submit([this]() {
this->InitCatalogManagerTask();
}));
if (txn_manager_ && !FLAGS_txn_manager_lazily_initialized) {
// Start initializing the TxnManager.
RETURN_NOT_OK(ScheduleTxnManagerInit());
}
state_ = kRunning;
return Status::OK();
}
void Master::InitCatalogManagerTask() {
Status s = InitCatalogManager();
if (!s.ok()) {
LOG(ERROR) << "Unable to init master catalog manager: " << s.ToString();
}
catalog_manager_init_status_.Set(s);
}
Status Master::InitCatalogManager() {
if (catalog_manager_->IsInitialized()) {
return Status::IllegalState("Catalog manager is already initialized");
}
RETURN_NOT_OK_PREPEND(catalog_manager_->Init(is_first_run_),
"Unable to initialize catalog manager");
return Status::OK();
}
Status Master::WaitForCatalogManagerInit() const {
CHECK_EQ(state_, kRunning);
return catalog_manager_init_status_.Get();
}
Status Master::ScheduleTxnManagerInit() {
DCHECK(txn_manager_);
return init_pool_->Submit([this]() { this->InitTxnManagerTask(); });
}
void Master::InitTxnManagerTask() {
DCHECK(txn_manager_);
// For successful TxnManager's initialization it's necessary to have enough
// tablet servers running in a Kudu cluster. Since Kudu master can be started
// up in environments where tablet servers start long after the master's
// startup, this task retries indefinitely to initialize TxnManager and
// make it ready to handle requests in case of non-lazy initialization mode
// (the latter is controlled by the --txn_manager_lazily_initialized flag).
Status s;
while (true) {
if (state_ == kStopping || state_ == kStopped) {
s = Status::Incomplete("shut down while trying to initialize TxnManager");
break;
}
s = InitTxnManager();
if (s.ok()) {
break;
}
// TODO(aserbin): if retrying every second looks too often, consider adding
// exponential back-off and adding condition variables to
// wake up a long-awaiting task and retry initialization
// right away when TxnManager receives a call.
static const MonoDelta kRetryInterval = MonoDelta::FromSeconds(1);
KLOG_EVERY_N_SECS(WARNING, 60) << Substitute(
"$0: unable to init TxnManager, will retry in $1",
s.ToString(), kRetryInterval.ToString());
SleepFor(kRetryInterval);
}
txn_manager_init_status_.Set(s);
}
Status Master::InitTxnManager() {
if (!txn_manager_) {
return Status::IllegalState("TxnManager is not enabled");
}
RETURN_NOT_OK_PREPEND(txn_manager_->Init(), "unable to initialize TxnManager");
return Status::OK();
}
Status Master::WaitForTxnManagerInit(const MonoDelta& timeout) const {
if (timeout.Initialized()) {
const Status* s = txn_manager_init_status_.WaitFor(timeout);
if (!s) {
return Status::TimedOut("timed out waiting for TxnManager to initialize");
}
}
return txn_manager_init_status_.Get();
}
Status Master::WaitUntilCatalogManagerIsLeaderAndReadyForTests(const MonoDelta& timeout) {
Status s;
MonoTime start = MonoTime::Now();
int backoff_ms = 1;
const int kMaxBackoffMs = 256;
do {
{
CatalogManager::ScopedLeaderSharedLock l(catalog_manager_.get());
if (l.first_failed_status().ok()) {
return Status::OK();
}
}
SleepFor(MonoDelta::FromMilliseconds(backoff_ms));
backoff_ms = min(backoff_ms << 1, kMaxBackoffMs);
} while (MonoTime::Now() < (start + timeout));
return Status::TimedOut("Maximum time exceeded waiting for master leadership",
s.ToString());
}
Status Master::GetMasterRegistration(ServerRegistrationPB* reg) const {
if (!registration_initialized_.load(std::memory_order_acquire)) {
return Status::ServiceUnavailable("Master startup not complete");
}
reg->CopyFrom(registration_);
return Status::OK();
}
Status Master::InitMasterRegistration() {
CHECK(!registration_initialized_.load());
ServerRegistrationPB reg;
vector<Sockaddr> rpc_addrs;
RETURN_NOT_OK_PREPEND(rpc_server()->GetAdvertisedAddresses(&rpc_addrs),
"Couldn't get RPC addresses");
RETURN_NOT_OK(AddHostPortPBs(rpc_addrs, reg.mutable_rpc_addresses()));
if (web_server()) {
vector<Sockaddr> http_addrs;
RETURN_NOT_OK(web_server()->GetAdvertisedAddresses(&http_addrs));
RETURN_NOT_OK(AddHostPortPBs(http_addrs, reg.mutable_http_addresses()));
reg.set_https_enabled(web_server()->IsSecure());
}
reg.set_software_version(VersionInfo::GetVersionInfo());
reg.set_start_time(start_walltime_);
registration_.Swap(&reg);
registration_initialized_.store(true);
return Status::OK();
}
void Master::ShutdownImpl() {
if (kInitialized == state_ || kRunning == state_) {
const string name = rpc_server_->ToString();
LOG(INFO) << "Master@" << name << " shutting down...";
state_ = kStopping;
// 1. Stop accepting new RPCs.
UnregisterAllServices();
// 2. Shut down the master's subsystems.
init_pool_->Shutdown();
maintenance_manager_->Shutdown();
catalog_manager_->Shutdown();
fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::DISK_ERROR);
fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::CFILE_CORRUPTION);
// 3. Shut down generic subsystems.
KuduServer::Shutdown();
LOG(INFO) << "Master@" << name << " shutdown complete.";
}
state_ = kStopped;
}
Status Master::ListMasters(vector<ServerEntryPB>* masters) const {
auto consensus = catalog_manager_->master_consensus();
if (!consensus) {
return Status::IllegalState("consensus not running");
}
const auto config = consensus->CommittedConfig();
masters->clear();
DCHECK_GE(config.peers_size(), 1);
// Optimized code path that doesn't involve reaching out to other
// masters over network for single master configuration.
if (config.peers_size() == 1) {
ServerEntryPB local_entry;
local_entry.mutable_instance_id()->CopyFrom(catalog_manager_->NodeInstance());
RETURN_NOT_OK(GetMasterRegistration(local_entry.mutable_registration()));
local_entry.set_role(RaftPeerPB::LEADER);
local_entry.set_cluster_id(catalog_manager_->GetClusterId());
local_entry.set_member_type(RaftPeerPB::VOTER);
masters->emplace_back(std::move(local_entry));
return Status::OK();
}
// For distributed master configuration.
for (const auto& peer : config.peers()) {
HostPort hp = HostPortFromPB(peer.last_known_addr());
ServerEntryPB peer_entry;
Status s = GetMasterEntryForHost(messenger_, hp, &peer_entry);
if (!s.ok()) {
s = s.CloneAndPrepend(
Substitute("Unable to get registration information for peer $0 ($1)",
peer.permanent_uuid(),
hp.ToString()));
LOG(WARNING) << s.ToString();
StatusToPB(s, peer_entry.mutable_error());
} else if (peer_entry.instance_id().permanent_uuid() != peer.permanent_uuid()) {
StatusToPB(Status::IllegalState(
Substitute("mismatched UUIDs: expected UUID $0 from master at $1, but got UUID $2",
peer.permanent_uuid(),
hp.ToString(),
peer_entry.instance_id().permanent_uuid())),
peer_entry.mutable_error());
}
masters->emplace_back(std::move(peer_entry));
}
return Status::OK();
}
Status Master::GetMasterHostPorts(vector<HostPort>* hostports, MasterType type) const {
auto consensus = catalog_manager_->master_consensus();
if (!consensus) {
return Status::IllegalState("consensus not running");
}
auto get_raft_member_type = [] (MasterType type) constexpr {
switch (type) {
case MasterType::VOTER_ONLY:
return RaftPeerPB::VOTER;
default:
LOG(FATAL) << "No matching Raft member type for master type: " << type;
}
};
hostports->clear();
consensus::RaftConfigPB config = consensus->CommittedConfig();
for (const auto& peer : *config.mutable_peers()) {
if (type == MasterType::ALL || get_raft_member_type(type) == peer.member_type()) {
// In non-distributed master configurations, we may not store our own
// last known address in the Raft config. So, we'll fill it in from
// the server Registration instead.
if (!peer.has_last_known_addr()) {
DCHECK_EQ(config.peers_size(), 1);
DCHECK(registration_initialized_.load());
DCHECK_GT(registration_.rpc_addresses_size(), 0);
hostports->emplace_back(HostPortFromPB(registration_.rpc_addresses(0)));
} else {
hostports->emplace_back(HostPortFromPB(peer.last_known_addr()));
}
}
}
return Status::OK();
}
Status Master::AddMaster(const HostPort& hp, rpc::RpcContext* rpc) {
// Ensure requested master to be added is not already part of list of masters.
vector<HostPort> masters;
// Here the check is made against committed config with all member types.
RETURN_NOT_OK(GetMasterHostPorts(&masters, MasterType::ALL));
if (std::find(masters.begin(), masters.end(), hp) != masters.end()) {
return Status::AlreadyPresent("Master already present");
}
// Check whether the master to be added is reachable and fetch its uuid.
ServerEntryPB peer_entry;
RETURN_NOT_OK(GetMasterEntryForHost(messenger_, hp, &peer_entry));
const auto& peer_uuid = peer_entry.instance_id().permanent_uuid();
// No early validation for whether a config change is in progress.
// If it's in progress, on initiating config change Raft will return error.
return catalog_manager()->InitiateMasterChangeConfig(CatalogManager::kAddMaster, hp, peer_uuid,
rpc);
}
Status Master::RemoveMaster(const HostPort& hp, const string& uuid, rpc::RpcContext* rpc) {
// Ensure requested master to be removed is part of list of masters.
auto consensus = catalog_manager_->master_consensus();
if (!consensus) {
return Status::IllegalState("consensus not running");
}
consensus::RaftConfigPB config = consensus->CommittedConfig();
// We can't allow removing a master from a single master configuration. Following
// check ensures a more appropriate error message is returned in case the removal
// was targeted for a different cluster.
if (config.peers_size() == 1) {
bool hp_found;
if (!config.peers(0).has_last_known_addr()) {
// In non-distributed master configurations, we may not store our own
// last known address in the Raft config.
DCHECK(registration_initialized_.load());
DCHECK_GT(registration_.rpc_addresses_size(), 0);
const auto& addresses = registration_.rpc_addresses();
hp_found = std::find_if(addresses.begin(), addresses.end(),
[&hp](const auto &hp_pb) {
return HostPortFromPB(hp_pb) == hp;
}) != addresses.end();
} else {
hp_found = HostPortFromPB(config.peers(0).last_known_addr()) == hp;
}
if (hp_found) {
return Status::InvalidArgument(Substitute("Can't remove master $0 in a single master "
"configuration", hp.ToString()));
}
return Status::NotFound(Substitute("Master $0 not found", hp.ToString()));
}
// UUIDs of masters matching the supplied HostPort 'hp' to remove.
vector<string> matching_masters;
for (const auto& peer : config.peers()) {
if (peer.has_last_known_addr() && HostPortFromPB(peer.last_known_addr()) == hp) {
matching_masters.push_back(peer.permanent_uuid());
}
}
string matching_uuid;
if (PREDICT_TRUE(matching_masters.size() == 1)) {
if (!uuid.empty() && uuid != matching_masters[0]) {
return Status::InvalidArgument(Substitute("Mismatch in UUID of the master $0 to be removed. "
"Expected: $1, supplied: $2.", hp.ToString(),
matching_masters[0], uuid));
}
matching_uuid = matching_masters[0];
} else if (matching_masters.empty()) {
return Status::NotFound(Substitute("Master $0 not found", hp.ToString()));
} else {
// We found multiple masters with matching HostPorts. Use the optional uuid to
// disambiguate, if possible.
DCHECK_GE(matching_masters.size(), 2);
if (!uuid.empty()) {
int matching_uuids_count = std::count(matching_masters.begin(), matching_masters.end(), uuid);
if (matching_uuids_count == 1) {
matching_uuid = uuid;
} else {
LOG(FATAL) << Substitute("Found multiple masters with same RPC address $0 and UUID $1",
hp.ToString(), uuid);
}
} else {
// Uuid not supplied and we found multiple matching HostPorts.
return Status::InvalidArgument(Substitute("Found multiple masters with same RPC address $0 "
"and following UUIDs $1. Supply UUID to "
"disambiguate.", hp.ToString(),
JoinStrings(matching_masters, ",")));
}
}
if (matching_uuid == fs_manager_->uuid()) {
return Status::InvalidArgument(Substitute("Can't remove the leader master $0", hp.ToString()));
}
// No early validation for whether a config change is in progress.
// If it's in progress, on initiating config change Raft will return error.
return catalog_manager()->InitiateMasterChangeConfig(CatalogManager::kRemoveMaster, hp,
matching_uuid, rpc);
}
} // namespace master
} // namespace kudu