blob: 928e3523379acc92a436e63627291e060325deac [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/master/ts_manager.h"
#include <time.h>
#include <algorithm>
#include <functional>
#include <limits>
#include <mutex>
#include <ostream>
#include <utility>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/location_cache.h"
#include "kudu/master/sys_catalog.h"
#include "kudu/master/ts_descriptor.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/trace.h"
DEFINE_bool(location_mapping_by_uuid, false,
"Whether the location command is given tablet server identifier "
"instead of hostname/IP address (for tests only).");
TAG_FLAG(location_mapping_by_uuid, hidden);
TAG_FLAG(location_mapping_by_uuid, unsafe);
METRIC_DEFINE_gauge_int32(server, cluster_replica_skew,
"Cluster Replica Skew",
kudu::MetricUnit::kTablets,
"The difference between the number of replicas on "
"the tablet server hosting the most replicas and "
"the number of replicas on the tablet server hosting "
"the least replicas.",
kudu::MetricLevel::kWarn);
using kudu::pb_util::SecureShortDebugString;
using std::lock_guard;
using std::unordered_set;
using std::shared_ptr;
using std::string;
using strings::Substitute;
namespace kudu {
namespace master {
class TServerStateLoader : public TServerStateVisitor {
public:
explicit TServerStateLoader(TSManager* ts_manager)
: ts_manager_(ts_manager) {}
Status Visit(const std::string& tserver_id,
const SysTServerStateEntryPB& metadata) override {
ts_manager_->ts_state_lock_.AssertAcquiredForWriting();
TServerStatePB state = metadata.state();
if (state == TServerStatePB::UNKNOWN_STATE) {
LOG(WARNING) << Substitute("ignoring unknown tserver state: $0", metadata.state());
return Status::OK();
}
DCHECK_NE(TServerStatePB::NONE, state);
int64_t timestamp = metadata.has_timestamp_secs() ? metadata.timestamp_secs() : -1;
InsertOrDie(&ts_manager_->ts_state_by_uuid_, tserver_id, { state, timestamp });
return Status::OK();
}
private:
TSManager* ts_manager_;
};
TSManager::TSManager(LocationCache* location_cache,
const scoped_refptr<MetricEntity>& metric_entity)
: ts_state_lock_(RWMutex::Priority::PREFER_READING),
location_cache_(location_cache) {
METRIC_cluster_replica_skew.InstantiateFunctionGauge(
metric_entity, [this]() { return this->ClusterSkew(); })
->AutoDetach(&metric_detacher_);
}
TSManager::~TSManager() {
}
Status TSManager::LookupTS(const NodeInstancePB& instance,
shared_ptr<TSDescriptor>* ts_desc) const {
shared_lock<rw_spinlock> l(lock_);
const shared_ptr<TSDescriptor>* found_ptr =
FindOrNull(servers_by_id_, instance.permanent_uuid());
if (!found_ptr) {
return Status::NotFound("unknown tablet server ID",
SecureShortDebugString(instance));
}
const shared_ptr<TSDescriptor>& found = *found_ptr;
if (instance.instance_seqno() != found->latest_seqno()) {
return Status::NotFound("mismatched instance sequence number",
SecureShortDebugString(instance));
}
*ts_desc = found;
return Status::OK();
}
bool TSManager::LookupTSByUUID(const string& uuid,
std::shared_ptr<TSDescriptor>* ts_desc) const {
shared_lock<rw_spinlock> l(lock_);
return FindCopy(servers_by_id_, uuid, ts_desc);
}
Status TSManager::RegisterTS(const NodeInstancePB& instance,
const ServerRegistrationPB& registration,
DnsResolver* dns_resolver,
std::shared_ptr<TSDescriptor>* desc) {
// Pre-condition: registration info should contain at least one RPC end-point.
if (registration.rpc_addresses().empty()) {
return Status::InvalidArgument(
"invalid registration: must have at least one RPC address",
SecureShortDebugString(registration));
}
const string& uuid = instance.permanent_uuid();
// Assign the location for the tablet server outside the lock: assigning
// a location involves calling the location mapping script which is relatively
// long and expensive operation.
boost::optional<string> location;
if (location_cache_) {
// In some test scenarios the location is assigned per tablet server UUID.
// That's the case when multiple (or even all) tablet servers have the same
// IP address for their RPC endpoint.
const auto& cmd_arg = PREDICT_FALSE(FLAGS_location_mapping_by_uuid)
? uuid : registration.rpc_addresses(0).host();
TRACE(Substitute("tablet server $0: assigning location", uuid));
string location_str;
const auto s = location_cache_->GetLocation(cmd_arg, &location_str);
TRACE(Substitute(
"tablet server $0: assigned location '$1'", uuid, location_str));
// If location resolution fails, log the error and return the status.
if (!s.ok()) {
CHECK(!registration.rpc_addresses().empty());
const auto& addr = registration.rpc_addresses(0);
KLOG_EVERY_N_SECS(ERROR, 60) << Substitute(
"Unable to assign location to tablet server $0: $1",
Substitute("$0 ($1:$2)", uuid, addr.host(), addr.port()),
s.ToString());
return s;
}
location.emplace(std::move(location_str));
}
shared_ptr<TSDescriptor> descriptor;
bool new_tserver = false;
{
lock_guard<rw_spinlock> l(lock_);
auto* descriptor_ptr = FindOrNull(servers_by_id_, uuid);
if (descriptor_ptr) {
descriptor = *descriptor_ptr;
RETURN_NOT_OK(descriptor->Register(
instance, registration, location, dns_resolver));
} else {
RETURN_NOT_OK(TSDescriptor::RegisterNew(
instance, registration, location, dns_resolver, &descriptor));
InsertOrDie(&servers_by_id_, uuid, descriptor);
new_tserver = true;
}
}
LOG(INFO) << Substitute("$0 tserver with Master: $1",
new_tserver ? "Registered new" : "Re-registered known",
descriptor->ToString());
*desc = std::move(descriptor);
return Status::OK();
}
void TSManager::GetAllDescriptors(TSDescriptorVector* descs) const {
descs->clear();
shared_lock<rw_spinlock> l(lock_);
AppendValuesFromMap(servers_by_id_, descs);
}
int TSManager::GetCount() const {
shared_lock<rw_spinlock> l(lock_);
return servers_by_id_.size();
}
int TSManager::GetLiveCount() const {
shared_lock<rw_spinlock> l(lock_);
int live_count = 0;
for (const auto& entry : servers_by_id_) {
const shared_ptr<TSDescriptor>& ts = entry.second;
if (!ts->PresumedDead()) {
live_count++;
}
}
return live_count;
}
unordered_set<string> TSManager::GetUuidsToIgnoreForUnderreplication() const {
unordered_set<string> uuids;
shared_lock<RWMutex> tsl(ts_state_lock_);
uuids.reserve(ts_state_by_uuid_.size());
for (const auto& ts_and_state_timestamp : ts_state_by_uuid_) {
if (ts_and_state_timestamp.second.first == TServerStatePB::MAINTENANCE_MODE) {
uuids.emplace(ts_and_state_timestamp.first);
}
}
return uuids;
}
TServerStateMap TSManager::GetTServerStates() const {
shared_lock<RWMutex> tsl(ts_state_lock_);
return ts_state_by_uuid_;
}
void TSManager::GetDescriptorsAvailableForPlacement(TSDescriptorVector* descs) const {
descs->clear();
shared_lock<RWMutex> tsl(ts_state_lock_);
shared_lock<rw_spinlock> l(lock_);
descs->reserve(servers_by_id_.size());
for (const TSDescriptorMap::value_type& entry : servers_by_id_) {
const shared_ptr<TSDescriptor>& ts = entry.second;
if (AvailableForPlacementUnlocked(*ts)) {
descs->push_back(ts);
}
}
}
Status TSManager::SetTServerState(const string& ts_uuid,
TServerStatePB ts_state,
ChangeTServerStateRequestPB::HandleMissingTS handle_missing_ts,
SysCatalogTable* sys_catalog) {
lock_guard<RWMutex> l(ts_state_lock_);
auto existing_state = FindWithDefault(
ts_state_by_uuid_, ts_uuid, { TServerStatePB::NONE, -1 }).first;
if (existing_state == ts_state) {
return Status::OK();
}
// If there is no existing state for the tserver, and the tserver hasn't yet
// been registered, return an error, as appropriate.
shared_ptr<TSDescriptor> ts_desc;
if (handle_missing_ts != ChangeTServerStateRequestPB::ALLOW_MISSING_TSERVER &&
existing_state == TServerStatePB::NONE && !LookupTSByUUID(ts_uuid, &ts_desc)) {
return Status::NotFound(Substitute("Requested tserver $0 has not been registered",
ts_uuid));
}
if (ts_state == TServerStatePB::NONE) {
RETURN_NOT_OK_PREPEND(sys_catalog->RemoveTServerState(ts_uuid),
Substitute("Failed to remove tserver state for $0", ts_uuid));
ts_state_by_uuid_.erase(ts_uuid);
// If exiting maintenance mode, make sure that any replica failures that
// may have been ignored while in maintenance mode are reprocessed. To do
// so, request full tablet reports across all tablet servers.
SetAllTServersNeedFullTabletReports();
LOG(INFO) << Substitute("Unset tserver state for $0 from $1",
ts_uuid, TServerStatePB_Name(existing_state));
return Status::OK();
}
SysTServerStateEntryPB pb;
pb.set_state(ts_state);
int64_t timestamp = time(nullptr);
pb.set_timestamp_secs(timestamp);
RETURN_NOT_OK_PREPEND(sys_catalog->WriteTServerState(ts_uuid, pb),
Substitute("Failed to set tserver state for $0 to $1",
ts_uuid, TServerStatePB_Name(ts_state)));
LOG(INFO) << Substitute("Set tserver state for $0 to $1",
ts_uuid, TServerStatePB_Name(ts_state));
InsertOrUpdate(&ts_state_by_uuid_, ts_uuid, { ts_state, timestamp });
return Status::OK();
}
TServerStatePB TSManager::GetTServerStateUnlocked(const string& ts_uuid) const {
ts_state_lock_.AssertAcquired();
return FindWithDefault(ts_state_by_uuid_, ts_uuid, { TServerStatePB::NONE, -1 }).first;
}
TServerStatePB TSManager::GetTServerState(const string& ts_uuid) const {
shared_lock<RWMutex> l(ts_state_lock_);
return GetTServerStateUnlocked(ts_uuid);
}
Status TSManager::ReloadTServerStates(SysCatalogTable* sys_catalog) {
lock_guard<RWMutex> l(ts_state_lock_);
ts_state_by_uuid_ = {};
TServerStateLoader loader(this);
return sys_catalog->VisitTServerStates(&loader);
}
void TSManager::SetAllTServersNeedFullTabletReports() {
lock_guard<rw_spinlock> l(lock_);
for (auto& id_and_desc : servers_by_id_) {
id_and_desc.second->UpdateNeedsFullTabletReport(true);
}
}
int TSManager::ClusterSkew() const {
int min_count = std::numeric_limits<int>::max();
int max_count = 0;
shared_lock<rw_spinlock> l(lock_);
for (const TSDescriptorMap::value_type& entry : servers_by_id_) {
const shared_ptr<TSDescriptor>& ts = entry.second;
if (ts->PresumedDead()) {
continue;
}
int num_live_replicas = ts->num_live_replicas();
min_count = std::min(min_count, num_live_replicas);
max_count = std::max(max_count, num_live_replicas);
}
return max_count - min_count;
}
bool TSManager::AvailableForPlacementUnlocked(const TSDescriptor& ts) const {
ts_state_lock_.AssertAcquired();
// TODO(KUDU-1827): this should also be used when decommissioning a server.
if (GetTServerStateUnlocked(ts.permanent_uuid()) == TServerStatePB::MAINTENANCE_MODE) {
return false;
}
// If the tablet server has heartbeated recently enough, it is considered
// alive and available for placement.
return !ts.PresumedDead();
}
} // namespace master
} // namespace kudu